Skip to content

Commit 76db7af

Browse files
Andrey Cheptsovclaude
andcommitted
Add per-job hourly log quota enforced on runner
Prevents runaway log costs by limiting log output per job per calendar hour. Default quota is 50MB/hour, configurable via DSTACK_SERVER_LOG_QUOTA_PER_JOB_HOUR (0 disables). Jobs exceeding the quota are terminated with reason log_quota_exceeded. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent c5fbd0a commit 76db7af

10 files changed

Lines changed: 130 additions & 7 deletions

File tree

runner/internal/common/types/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,5 @@ const (
1010
TerminationReasonTerminatedByUser TerminationReason = "terminated_by_user"
1111
TerminationReasonTerminatedByServer TerminationReason = "terminated_by_server"
1212
TerminationReasonMaxDurationExceeded TerminationReason = "max_duration_exceeded"
13+
TerminationReasonLogQuotaExceeded TerminationReason = "log_quota_exceeded"
1314
)

runner/internal/runner/executor/executor.go

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,17 @@ func (ex *RunExecutor) Run(ctx context.Context) (err error) {
261261
default:
262262
}
263263

264+
if errors.Is(err, ErrLogQuotaExceeded) {
265+
log.Error(ctx, "Log quota exceeded", "quota", ex.jobSpec.LogQuotaHour)
266+
ex.SetJobStateWithTerminationReason(
267+
ctx,
268+
schemas.JobStateFailed,
269+
types.TerminationReasonLogQuotaExceeded,
270+
fmt.Sprintf("Job log output exceeded the hourly quota of %d bytes", ex.jobSpec.LogQuotaHour),
271+
)
272+
return fmt.Errorf("log quota exceeded: %w", err)
273+
}
274+
264275
// todo fail reason?
265276
log.Error(ctx, "Exec failed", "err", err)
266277
var exitError *exec.ExitError
@@ -283,6 +294,7 @@ func (ex *RunExecutor) SetJob(body schemas.SubmitBody) {
283294
ex.clusterInfo = body.ClusterInfo
284295
ex.secrets = body.Secrets
285296
ex.repoCredentials = body.RepoCredentials
297+
ex.jobLogs.SetQuota(body.JobSpec.LogQuotaHour)
286298
ex.state = WaitCode
287299
}
288300

@@ -586,18 +598,51 @@ func (ex *RunExecutor) execJob(ctx context.Context, jobLogFile io.Writer) error
586598
defer func() { _ = cmd.Wait() }() // release resources if copy fails
587599

588600
stripper := ansistrip.NewWriter(ex.jobLogs, AnsiStripFlushInterval, AnsiStripMaxDelay, MaxBufferSize)
589-
defer func() { _ = stripper.Close() }()
590601
logger := io.MultiWriter(jobLogFile, ex.jobWsLogs, stripper)
591-
_, err = io.Copy(logger, ptm)
592-
if err != nil && !isPtyError(err) {
593-
return fmt.Errorf("copy command output: %w", err)
602+
603+
if err := ex.copyOutputWithQuota(cmd, ptm, stripper, logger); err != nil {
604+
return err
594605
}
595606
if err = cmd.Wait(); err != nil {
596607
return fmt.Errorf("wait for command: %w", err)
597608
}
598609
return nil
599610
}
600611

612+
// copyOutputWithQuota streams process output through the log pipeline and
613+
// monitors for log quota exceeded. The quota signal is out-of-band (via channel)
614+
// because the ansistrip writer is async and swallows downstream write errors.
615+
func (ex *RunExecutor) copyOutputWithQuota(cmd *exec.Cmd, ptm io.Reader, stripper io.Closer, logger io.Writer) error {
616+
copyDone := make(chan error, 1)
617+
go func() {
618+
_, err := io.Copy(logger, ptm)
619+
copyDone <- err
620+
}()
621+
622+
// Wait for either io.Copy to finish or quota to be exceeded.
623+
var copyErr error
624+
select {
625+
case copyErr = <-copyDone:
626+
case <-ex.jobLogs.QuotaExceeded():
627+
_ = cmd.Process.Kill()
628+
<-copyDone
629+
}
630+
631+
// Flush the ansistrip buffer — may also trigger quota exceeded.
632+
_ = stripper.Close()
633+
634+
select {
635+
case <-ex.jobLogs.QuotaExceeded():
636+
return ErrLogQuotaExceeded
637+
default:
638+
}
639+
640+
if copyErr != nil && !isPtyError(copyErr) {
641+
return fmt.Errorf("copy command output: %w", copyErr)
642+
}
643+
return nil
644+
}
645+
601646
// setupGitCredentials must be called from Run after setJobUser
602647
func (ex *RunExecutor) setupGitCredentials(ctx context.Context) (func(), error) {
603648
if ex.repoCredentials == nil {

runner/internal/runner/executor/executor_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,28 @@ func TestExecutor_MaxDuration(t *testing.T) {
141141
assert.ErrorContains(t, err, "killed")
142142
}
143143

144+
func TestExecutor_LogQuota(t *testing.T) {
145+
if testing.Short() {
146+
t.Skip()
147+
}
148+
149+
ex := makeTestExecutor(t)
150+
ex.killDelay = 500 * time.Millisecond
151+
// Output >100 bytes to trigger the quota
152+
ex.jobSpec.Commands = append(ex.jobSpec.Commands, "for i in $(seq 1 20); do echo 'This line is long enough to exceed the quota easily'; done")
153+
ex.jobSpec.LogQuotaHour = 100 // 100 bytes
154+
ex.jobLogs.SetQuota(100)
155+
makeCodeTar(t, ex)
156+
157+
err := ex.Run(t.Context())
158+
assert.ErrorContains(t, err, "log quota exceeded")
159+
160+
// Verify the termination state was set
161+
history := ex.GetHistory(0)
162+
lastState := history.JobStates[len(history.JobStates)-1]
163+
assert.Equal(t, schemas.JobStateFailed, lastState.State)
164+
}
165+
144166
func TestExecutor_RemoteRepo(t *testing.T) {
145167
if testing.Short() {
146168
t.Skip()

runner/internal/runner/executor/logs.go

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,63 @@
11
package executor
22

33
import (
4+
"errors"
45
"sync"
6+
"time"
57

68
"github.com/dstackai/dstack/runner/internal/runner/schemas"
79
)
810

11+
var ErrLogQuotaExceeded = errors.New("log quota exceeded")
12+
913
type appendWriter struct {
1014
mu *sync.RWMutex // shares with executor
1115
history []schemas.LogEvent
1216
timestamp *MonotonicTimestamp // shares with executor
17+
18+
quota int // bytes per hour, 0 = unlimited
19+
bytesInHour int // bytes written in current hour bucket
20+
hourStart int64 // unix timestamp (seconds) of current hour bucket start
21+
quotaExceeded chan struct{} // closed when quota is exceeded (out-of-band signal)
22+
exceededOnce sync.Once
1323
}
1424

1525
func newAppendWriter(mu *sync.RWMutex, timestamp *MonotonicTimestamp) *appendWriter {
1626
return &appendWriter{
17-
mu: mu,
18-
history: make([]schemas.LogEvent, 0),
19-
timestamp: timestamp,
27+
mu: mu,
28+
history: make([]schemas.LogEvent, 0),
29+
timestamp: timestamp,
30+
quotaExceeded: make(chan struct{}),
2031
}
2132
}
2233

34+
func (w *appendWriter) SetQuota(quota int) {
35+
w.quota = quota
36+
}
37+
38+
// QuotaExceeded returns a channel that is closed when the log quota is exceeded.
39+
func (w *appendWriter) QuotaExceeded() <-chan struct{} {
40+
return w.quotaExceeded
41+
}
42+
2343
func (w *appendWriter) Write(p []byte) (n int, err error) {
2444
w.mu.Lock()
2545
defer w.mu.Unlock()
2646

47+
if w.quota > 0 {
48+
now := time.Now().Unix()
49+
currentHour := (now / 3600) * 3600
50+
if currentHour != w.hourStart {
51+
w.bytesInHour = 0
52+
w.hourStart = currentHour
53+
}
54+
if w.bytesInHour+len(p) > w.quota {
55+
w.exceededOnce.Do(func() { close(w.quotaExceeded) })
56+
return 0, ErrLogQuotaExceeded
57+
}
58+
w.bytesInHour += len(p)
59+
}
60+
2761
pCopy := make([]byte, len(p))
2862
copy(pCopy, p)
2963
w.history = append(w.history, schemas.LogEvent{Message: pCopy, Timestamp: w.timestamp.Next()})

runner/internal/runner/schemas/schemas.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ type JobSpec struct {
7979
Env map[string]string `json:"env"`
8080
SingleBranch bool `json:"single_branch"`
8181
MaxDuration int `json:"max_duration"`
82+
LogQuotaHour int `json:"log_quota_hour"` // bytes per hour, 0 = unlimited
8283
SSHKey *SSHKey `json:"ssh_key"`
8384
WorkingDir *string `json:"working_dir"`
8485
RepoDir *string `json:"repo_dir"`

src/dstack/_internal/core/models/runs.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ class JobTerminationReason(str, Enum):
151151
CREATING_CONTAINER_ERROR = "creating_container_error"
152152
EXECUTOR_ERROR = "executor_error"
153153
MAX_DURATION_EXCEEDED = "max_duration_exceeded"
154+
LOG_QUOTA_EXCEEDED = "log_quota_exceeded"
154155

155156
def to_status(self) -> JobStatus:
156157
mapping = {
@@ -173,6 +174,7 @@ def to_status(self) -> JobStatus:
173174
self.CREATING_CONTAINER_ERROR: JobStatus.FAILED,
174175
self.EXECUTOR_ERROR: JobStatus.FAILED,
175176
self.MAX_DURATION_EXCEEDED: JobStatus.TERMINATED,
177+
self.LOG_QUOTA_EXCEEDED: JobStatus.FAILED,
176178
}
177179
return mapping[self]
178180

@@ -205,6 +207,7 @@ def to_error(self) -> Optional[str]:
205207
JobTerminationReason.CREATING_CONTAINER_ERROR: "runner error",
206208
JobTerminationReason.EXECUTOR_ERROR: "executor error",
207209
JobTerminationReason.MAX_DURATION_EXCEEDED: "max duration exceeded",
210+
JobTerminationReason.LOG_QUOTA_EXCEEDED: "log quota exceeded",
208211
}
209212
return error_mapping.get(self)
210213

@@ -275,6 +278,9 @@ class JobSpec(CoreModel):
275278
privileged: bool = False
276279
single_branch: Optional[bool] = None
277280
max_duration: Optional[int]
281+
log_quota_hour: Optional[int] = None
282+
"""`log_quota_hour` is the maximum number of bytes of log output per calendar hour.
283+
`None` means unlimited. Set from `DSTACK_SERVER_LOG_QUOTA_PER_JOB_HOUR`."""
278284
stop_duration: Optional[int] = None
279285
utilization_policy: Optional[UtilizationPolicy] = None
280286
registry_auth: Optional[RegistryAuth]

src/dstack/_internal/server/schemas/runner.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ class SubmitBody(CoreModel):
8383
"gateway",
8484
"single_branch",
8585
"max_duration",
86+
"log_quota_hour",
8687
"ssh_key",
8788
"working_dir",
8889
"repo_dir",

src/dstack/_internal/server/services/jobs/configurators/base.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
from dstack._internal.core.models.volumes import MountPoint, VolumeMountPoint
4949
from dstack._internal.core.services.profiles import get_retry
5050
from dstack._internal.core.services.ssh.ports import filter_reserved_ports
51+
from dstack._internal.server import settings as server_settings
5152
from dstack._internal.server.services.docker import ImageConfig, get_image_config
5253
from dstack._internal.utils import crypto
5354
from dstack._internal.utils.common import run_async
@@ -169,6 +170,7 @@ async def _get_job_spec(
169170
privileged=self._privileged(),
170171
single_branch=self._single_branch(),
171172
max_duration=self._max_duration(),
173+
log_quota_hour=self._log_quota_hour(),
172174
stop_duration=self._stop_duration(),
173175
utilization_policy=self._utilization_policy(),
174176
registry_auth=self._registry_auth(),
@@ -304,6 +306,10 @@ def _max_duration(self) -> Optional[int]:
304306
return None
305307
return self.run_spec.merged_profile.max_duration
306308

309+
def _log_quota_hour(self) -> Optional[int]:
310+
quota = server_settings.SERVER_LOG_QUOTA_PER_JOB_HOUR
311+
return quota if quota > 0 else None
312+
307313
def _stop_duration(self) -> Optional[int]:
308314
if self.run_spec.merged_profile.stop_duration is None:
309315
return DEFAULT_STOP_DURATION

src/dstack/_internal/server/settings.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,11 @@
133133

134134
SERVER_TEMPLATES_REPO = os.getenv("DSTACK_SERVER_TEMPLATES_REPO")
135135

136+
# Per-job log quota: maximum bytes of log output per calendar hour. 0 = unlimited.
137+
SERVER_LOG_QUOTA_PER_JOB_HOUR = int(
138+
os.getenv("DSTACK_SERVER_LOG_QUOTA_PER_JOB_HOUR", 50 * 1024 * 1024) # 50 MB
139+
)
140+
136141
# Development settings
137142

138143
SQL_ECHO_ENABLED = os.getenv("DSTACK_SQL_ECHO_ENABLED") is not None

src/tests/_internal/server/routers/test_runs.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,7 @@ def get_dev_env_run_plan_dict(
269269
"replica_group": "0",
270270
"single_branch": False,
271271
"max_duration": None,
272+
"log_quota_hour": 52428800,
272273
"stop_duration": 300,
273274
"utilization_policy": None,
274275
"registry_auth": None,
@@ -506,6 +507,7 @@ def get_dev_env_run_dict(
506507
"replica_group": "0",
507508
"single_branch": False,
508509
"max_duration": None,
510+
"log_quota_hour": 52428800,
509511
"stop_duration": 300,
510512
"utilization_policy": None,
511513
"registry_auth": None,

0 commit comments

Comments
 (0)