Skip to content

Commit 620aee7

Browse files
Andrey Cheptsovclaude
andcommitted
Add data transfer quota to terminate jobs exceeding outbound traffic limits
Adds a configurable per-job outbound data transfer quota (AWS only) that terminates jobs when the total external traffic exceeds the threshold. Metering uses iptables byte counters on the shim (host-level), excluding private/VPC traffic. The shim notifies the runner via a new /api/terminate endpoint so the server reads the termination reason through the existing /api/pull flow — same pattern as log quota. Configured via DSTACK_SERVER_DATA_TRANSFER_QUOTA_PER_JOB_AWS (bytes, 0=unlimited). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 1568525 commit 620aee7

15 files changed

Lines changed: 503 additions & 5 deletions

File tree

runner/internal/common/types/types.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,6 @@ const (
1010
TerminationReasonTerminatedByUser TerminationReason = "terminated_by_user"
1111
TerminationReasonTerminatedByServer TerminationReason = "terminated_by_server"
1212
TerminationReasonMaxDurationExceeded TerminationReason = "max_duration_exceeded"
13-
TerminationReasonLogQuotaExceeded TerminationReason = "log_quota_exceeded"
13+
TerminationReasonLogQuotaExceeded TerminationReason = "log_quota_exceeded"
14+
TerminationReasonDataTransferQuotaExceeded TerminationReason = "data_transfer_quota_exceeded"
1415
)

runner/internal/runner/api/http.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,24 @@ func (s *Server) stopPostHandler(w http.ResponseWriter, r *http.Request) (interf
194194
return nil, nil
195195
}
196196

197+
func (s *Server) terminatePostHandler(w http.ResponseWriter, r *http.Request) (interface{}, error) {
198+
var body schemas.TerminateBody
199+
if err := api.DecodeJSONBody(w, r, &body, true); err != nil {
200+
return nil, err
201+
}
202+
ctx := r.Context()
203+
log.Error(ctx, "Terminate requested", "reason", body.Reason, "message", body.Message)
204+
// No executor.Lock() needed — SetJobStateWithTerminationReason acquires its own lock.
205+
// Using the external lock would deadlock with io.Copy holding it during job execution.
206+
s.executor.SetJobStateWithTerminationReason(
207+
ctx,
208+
schemas.JobStateFailed,
209+
body.Reason,
210+
body.Message,
211+
)
212+
return nil, nil
213+
}
214+
197215
func isMaxBytesError(err error) bool {
198216
var maxBytesError *http.MaxBytesError
199217
return errors.As(err, &maxBytesError)

runner/internal/runner/api/server.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ func NewServer(ctx context.Context, address string, version string, ex executor.
6868
r.AddHandler("POST", "/api/run", s.runPostHandler)
6969
r.AddHandler("GET", "/api/pull", s.pullGetHandler)
7070
r.AddHandler("POST", "/api/stop", s.stopPostHandler)
71+
r.AddHandler("POST", "/api/terminate", s.terminatePostHandler)
7172
r.AddHandler("GET", "/logs_ws", s.logsWsGetHandler)
7273
return s, nil
7374
}

runner/internal/runner/schemas/schemas.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ type SubmitBody struct {
3939
LogQuotaHour int `json:"log_quota_hour"` // bytes per hour, 0 = unlimited
4040
}
4141

42+
type TerminateBody struct {
43+
Reason types.TerminationReason `json:"reason"`
44+
Message string `json:"message"`
45+
}
46+
4247
type PullResponse struct {
4348
JobStates []JobStateEvent `json:"job_states"`
4449
JobLogs []LogEvent `json:"job_logs"`

runner/internal/shim/docker.go

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"errors"
1010
"fmt"
1111
"io"
12+
"net/http"
1213
"os"
1314
"os/exec"
1415
"os/user"
@@ -37,6 +38,7 @@ import (
3738
"github.com/dstackai/dstack/runner/internal/common/types"
3839
"github.com/dstackai/dstack/runner/internal/shim/backends"
3940
"github.com/dstackai/dstack/runner/internal/shim/host"
41+
"github.com/dstackai/dstack/runner/internal/shim/netmeter"
4042
)
4143

4244
// TODO: Allow for configuration via cli arguments or environment variables.
@@ -380,7 +382,8 @@ func (d *DockerRunner) Run(ctx context.Context, taskID string) error {
380382
if err := d.tasks.Update(task); err != nil {
381383
return fmt.Errorf("%w: failed to update task %s: %w", ErrInternal, task.ID, err)
382384
}
383-
err = d.waitContainer(ctx, &task)
385+
386+
err = d.waitContainerWithQuota(ctx, &task, cfg)
384387
}
385388
if err != nil {
386389
log.Error(ctx, "failed to run container", "err", err)
@@ -910,6 +913,49 @@ func (d *DockerRunner) waitContainer(ctx context.Context, task *Task) error {
910913
return nil
911914
}
912915

916+
// waitContainerWithQuota waits for the container to finish, optionally enforcing
917+
// a data transfer quota. If the quota is exceeded, it notifies the runner
918+
// (so the server reads the termination reason via /api/pull) and stops the container.
919+
func (d *DockerRunner) waitContainerWithQuota(ctx context.Context, task *Task, cfg TaskConfig) error {
920+
if cfg.DataTransferQuota <= 0 {
921+
return d.waitContainer(ctx, task)
922+
}
923+
924+
nm := netmeter.New(task.ID, cfg.DataTransferQuota)
925+
if err := nm.Start(ctx); err != nil {
926+
errMessage := fmt.Sprintf("data transfer quota configured but metering unavailable: %s", err)
927+
log.Error(ctx, errMessage)
928+
task.SetStatusTerminated(string(types.TerminationReasonExecutorError), errMessage)
929+
return fmt.Errorf("data transfer meter: %w", err)
930+
}
931+
defer nm.Stop()
932+
933+
waitDone := make(chan error, 1)
934+
go func() { waitDone <- d.waitContainer(ctx, task) }()
935+
936+
select {
937+
case err := <-waitDone:
938+
return err
939+
case <-nm.Exceeded():
940+
log.Error(ctx, "Data transfer quota exceeded", "task", task.ID, "quota", cfg.DataTransferQuota)
941+
terminateMsg := fmt.Sprintf("Outbound data transfer exceeded quota of %d bytes", cfg.DataTransferQuota)
942+
if err := terminateRunner(ctx, d.dockerParams.RunnerHTTPPort(),
943+
types.TerminationReasonDataTransferQuotaExceeded, terminateMsg); err != nil {
944+
log.Error(ctx, "failed to notify runner of termination", "err", err)
945+
}
946+
stopTimeout := 10
947+
stopOpts := container.StopOptions{Timeout: &stopTimeout}
948+
if err := d.client.ContainerStop(ctx, task.containerID, stopOpts); err != nil {
949+
log.Error(ctx, "failed to stop container after quota exceeded", "err", err)
950+
}
951+
<-waitDone
952+
// The runner already set the job state with the termination reason.
953+
// The server will read it via /api/pull.
954+
task.SetStatusTerminated(string(types.TerminationReasonDoneByRunner), "")
955+
return nil
956+
}
957+
}
958+
913959
func encodeRegistryAuth(username string, password string) (string, error) {
914960
if username == "" && password == "" {
915961
return "", nil
@@ -1180,6 +1226,31 @@ func getContainerLastLogs(ctx context.Context, client docker.APIClient, containe
11801226
return lines, nil
11811227
}
11821228

1229+
// terminateRunner calls the runner's /api/terminate endpoint to set the job termination state.
1230+
// This allows the server to read the termination reason via /api/pull before the container dies.
1231+
func terminateRunner(ctx context.Context, runnerPort int, reason types.TerminationReason, message string) error {
1232+
url := fmt.Sprintf("http://localhost:%d/api/terminate", runnerPort)
1233+
body := fmt.Sprintf(`{"reason":%q,"message":%q}`, reason, message)
1234+
// 5s is generous for a localhost HTTP call; if the runner doesn't respond in time,
1235+
// we proceed with stopping the container anyway (the server will handle the termination).
1236+
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
1237+
defer cancel()
1238+
req, err := http.NewRequestWithContext(ctx, "POST", url, strings.NewReader(body))
1239+
if err != nil {
1240+
return fmt.Errorf("create request: %w", err)
1241+
}
1242+
req.Header.Set("Content-Type", "application/json")
1243+
resp, err := http.DefaultClient.Do(req)
1244+
if err != nil {
1245+
return fmt.Errorf("request failed: %w", err)
1246+
}
1247+
defer resp.Body.Close()
1248+
if resp.StatusCode != http.StatusOK {
1249+
return fmt.Errorf("unexpected status: %d", resp.StatusCode)
1250+
}
1251+
return nil
1252+
}
1253+
11831254
/* DockerParameters interface implementation for CLIArgs */
11841255

11851256
func (c *CLIArgs) DockerPrivileged() bool {
@@ -1228,6 +1299,10 @@ func (c *CLIArgs) DockerPorts() []int {
12281299
return []int{c.Runner.HTTPPort, c.Runner.SSHPort}
12291300
}
12301301

1302+
func (c *CLIArgs) RunnerHTTPPort() int {
1303+
return c.Runner.HTTPPort
1304+
}
1305+
12311306
func (c *CLIArgs) MakeRunnerDir(name string) (string, error) {
12321307
runnerTemp := filepath.Join(c.Shim.HomeDir, "runners", name)
12331308
if err := os.MkdirAll(runnerTemp, 0o755); err != nil {

runner/internal/shim/docker_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,10 @@ func (c *dockerParametersMock) DockerPorts() []int {
123123
return []int{}
124124
}
125125

126+
func (c *dockerParametersMock) RunnerHTTPPort() int {
127+
return 10999
128+
}
129+
126130
func (c *dockerParametersMock) DockerMounts(string) ([]mount.Mount, error) {
127131
return nil, nil
128132
}

runner/internal/shim/models.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ type DockerParameters interface {
99
DockerShellCommands(authorizedKeys []string, runnerHttpAddress string) []string
1010
DockerMounts(string) ([]mount.Mount, error)
1111
DockerPorts() []int
12+
RunnerHTTPPort() int
1213
MakeRunnerDir(name string) (string, error)
1314
DockerPJRTDevice() string
1415
}
@@ -98,9 +99,10 @@ type TaskConfig struct {
9899
// GPUDevices allows the server to set gpu devices instead of relying on the runner default logic.
99100
// E.g. passing nvidia devices directly instead of using nvidia-container-toolkit.
100101
GPUDevices []GPUDevice `json:"gpu_devices"`
101-
HostSshUser string `json:"host_ssh_user"`
102-
HostSshKeys []string `json:"host_ssh_keys"`
103-
ContainerSshKeys []string `json:"container_ssh_keys"`
102+
HostSshUser string `json:"host_ssh_user"`
103+
HostSshKeys []string `json:"host_ssh_keys"`
104+
ContainerSshKeys []string `json:"container_ssh_keys"`
105+
DataTransferQuota int64 `json:"data_transfer_quota"` // total bytes for job lifetime; 0 = unlimited
104106
}
105107

106108
type TaskListItem struct {

0 commit comments

Comments
 (0)