Skip to content

Commit a9ff742

Browse files
[Feature] Strip ANSI codes from run logs and store them as plain text instead of bytes #2841
1 parent f5c8fcf commit a9ff742

12 files changed

Lines changed: 211 additions & 131 deletions

File tree

runner/go.mod

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/dstackai/dstack/runner
22

3-
go 1.23
3+
go 1.23.8
44

55
require (
66
github.com/alexellis/go-execute/v2 v2.2.1
@@ -10,6 +10,7 @@ require (
1010
github.com/docker/docker v26.0.0+incompatible
1111
github.com/docker/go-connections v0.5.0
1212
github.com/docker/go-units v0.5.0
13+
github.com/dstackai/ansistrip v0.0.5
1314
github.com/go-git/go-git/v5 v5.12.0
1415
github.com/golang/gddo v0.0.0-20210115222349-20d68f94ee1f
1516
github.com/gorilla/websocket v1.5.1
@@ -62,6 +63,7 @@ require (
6263
github.com/russross/blackfriday/v2 v2.1.0 // indirect
6364
github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3 // indirect
6465
github.com/skeema/knownhosts v1.2.2 // indirect
66+
github.com/tidwall/btree v1.7.0 // indirect
6567
github.com/tklauser/go-sysconf v0.3.12 // indirect
6668
github.com/tklauser/numcpus v0.6.1 // indirect
6769
github.com/ulikunitz/xz v0.5.12 // indirect

runner/go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj
4747
github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc=
4848
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
4949
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
50+
github.com/dstackai/ansistrip v0.0.5 h1:yOCjeeeDCeJHLHl/L0crXiLbHa17MPR2n4gPhodio3g=
51+
github.com/dstackai/ansistrip v0.0.5/go.mod h1:w3ejXI0twxDv6bPXhkOaPeYdbwz2nwcrcvFoZGqi9F0=
5052
github.com/ebitengine/purego v0.8.1 h1:sdRKd6plj7KYW33EH5As6YKfe8m9zbN9JMrOjNVF/BE=
5153
github.com/ebitengine/purego v0.8.1/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ=
5254
github.com/elazarl/goproxy v0.0.0-20230808193330-2592e75ae04a h1:mATvB/9r/3gvcejNsXKSkQ6lcIaNec2nyfOdlTBR2lU=
@@ -171,6 +173,8 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
171173
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
172174
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
173175
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
176+
github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI=
177+
github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY=
174178
github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU=
175179
github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI=
176180
github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk=

runner/internal/executor/base.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010

1111
type Executor interface {
1212
GetHistory(timestamp int64) *schemas.PullResponse
13-
GetJobLogsHistory() []schemas.LogEvent
13+
GetJobWsLogsHistory() []schemas.LogEvent
1414
GetRunnerState() string
1515
Run(ctx context.Context) error
1616
SetCodePath(codePath string)

runner/internal/executor/executor.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"time"
1919

2020
"github.com/creack/pty"
21+
"github.com/dstackai/ansistrip"
2122
"github.com/dstackai/dstack/runner/consts"
2223
"github.com/dstackai/dstack/runner/internal/connections"
2324
"github.com/dstackai/dstack/runner/internal/gerrors"
@@ -47,6 +48,7 @@ type RunExecutor struct {
4748
state string
4849
jobStateHistory []schemas.JobStateEvent
4950
jobLogs *appendWriter
51+
jobWsLogs *appendWriter
5052
runnerLogs *appendWriter
5153
timestamp *MonotonicTimestamp
5254

@@ -86,6 +88,7 @@ func NewRunExecutor(tempDir string, homeDir string, workingDir string, sshPort i
8688
state: WaitSubmit,
8789
jobStateHistory: make([]schemas.JobStateEvent, 0),
8890
jobLogs: newAppendWriter(mu, timestamp),
91+
jobWsLogs: newAppendWriter(mu, timestamp),
8992
runnerLogs: newAppendWriter(mu, timestamp),
9093
timestamp: timestamp,
9194

@@ -129,7 +132,8 @@ func (ex *RunExecutor) Run(ctx context.Context) (err error) {
129132
}
130133
}()
131134

132-
logger := io.MultiWriter(runnerLogFile, os.Stdout, ex.runnerLogs)
135+
stripper := ansistrip.NewWriter(ex.runnerLogs, 500*time.Millisecond, 3*time.Second)
136+
logger := io.MultiWriter(runnerLogFile, os.Stdout, stripper)
133137
ctx = log.WithLogger(ctx, log.NewEntry(logger, int(log.DefaultEntry.Logger.Level))) // todo loglevel
134138
log.Info(ctx, "Run job", "log_level", log.GetLogger(ctx).Logger.Level.String())
135139

@@ -188,6 +192,7 @@ func (ex *RunExecutor) Run(ctx context.Context) (err error) {
188192
select {
189193
case <-ctx.Done():
190194
log.Error(ctx, "Job canceled")
195+
stripper.Close()
191196
ex.SetJobState(ctx, types.JobStateTerminated)
192197
return gerrors.Wrap(err)
193198
default:
@@ -196,6 +201,7 @@ func (ex *RunExecutor) Run(ctx context.Context) (err error) {
196201
select {
197202
case <-timeoutCtx.Done():
198203
log.Error(ctx, "Max duration exceeded", "max_duration", ex.jobSpec.MaxDuration)
204+
stripper.Close()
199205
ex.SetJobStateWithTerminationReason(
200206
ctx,
201207
types.JobStateTerminated,
@@ -206,6 +212,7 @@ func (ex *RunExecutor) Run(ctx context.Context) (err error) {
206212
default:
207213
}
208214

215+
stripper.Close()
209216
// todo fail reason?
210217
log.Error(ctx, "Exec failed", "err", err)
211218
var exitError *exec.ExitError
@@ -217,6 +224,7 @@ func (ex *RunExecutor) Run(ctx context.Context) (err error) {
217224
return gerrors.Wrap(err)
218225
}
219226

227+
stripper.Close()
220228
ex.SetJobStateWithExitStatus(ctx, types.JobStateDone, 0)
221229
return nil
222230
}
@@ -431,11 +439,13 @@ func (ex *RunExecutor) execJob(ctx context.Context, jobLogFile io.Writer) error
431439
defer func() { _ = ptm.Close() }()
432440
defer func() { _ = cmd.Wait() }() // release resources if copy fails
433441

434-
logger := io.MultiWriter(jobLogFile, ex.jobLogs)
442+
stripper := ansistrip.NewWriter(ex.jobLogs, 500*time.Millisecond, 3*time.Second)
443+
logger := io.MultiWriter(jobLogFile, ex.jobWsLogs, stripper)
435444
_, err = io.Copy(logger, ptm)
436445
if err != nil && !isPtyError(err) {
437446
return gerrors.Wrap(err)
438447
}
448+
stripper.Close()
439449
return gerrors.Wrap(cmd.Wait())
440450
}
441451

runner/internal/executor/executor_test.go

Lines changed: 104 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"os"
1010
"os/exec"
1111
"path/filepath"
12+
"strings"
1213
"testing"
1314
"time"
1415

@@ -17,8 +18,6 @@ import (
1718
"github.com/stretchr/testify/require"
1819
)
1920

20-
// todo test get history
21-
2221
func TestExecutor_WorkingDir_Current(t *testing.T) {
2322
var b bytes.Buffer
2423
ex := makeTestExecutor(t)
@@ -28,7 +27,8 @@ func TestExecutor_WorkingDir_Current(t *testing.T) {
2827

2928
err := ex.execJob(context.TODO(), io.Writer(&b))
3029
assert.NoError(t, err)
31-
assert.Equal(t, ex.workingDir+"\r\n", b.String())
30+
// Normalize line endings for cross-platform compatibility.
31+
assert.Equal(t, ex.workingDir+"\n", strings.ReplaceAll(b.String(), "\r\n", "\n"))
3232
}
3333

3434
func TestExecutor_WorkingDir_Nil(t *testing.T) {
@@ -39,7 +39,7 @@ func TestExecutor_WorkingDir_Nil(t *testing.T) {
3939

4040
err := ex.execJob(context.TODO(), io.Writer(&b))
4141
assert.NoError(t, err)
42-
assert.Equal(t, ex.workingDir+"\r\n", b.String())
42+
assert.Equal(t, ex.workingDir+"\n", strings.ReplaceAll(b.String(), "\r\n", "\n"))
4343
}
4444

4545
func TestExecutor_HomeDir(t *testing.T) {
@@ -49,7 +49,7 @@ func TestExecutor_HomeDir(t *testing.T) {
4949

5050
err := ex.execJob(context.TODO(), io.Writer(&b))
5151
assert.NoError(t, err)
52-
assert.Equal(t, ex.homeDir+"\r\n", b.String())
52+
assert.Equal(t, ex.homeDir+"\n", strings.ReplaceAll(b.String(), "\r\n", "\n"))
5353
}
5454

5555
func TestExecutor_NonZeroExit(t *testing.T) {
@@ -61,7 +61,7 @@ func TestExecutor_NonZeroExit(t *testing.T) {
6161
assert.Error(t, err)
6262
assert.NotEmpty(t, ex.jobStateHistory)
6363
exitStatus := ex.jobStateHistory[len(ex.jobStateHistory)-1].ExitStatus
64-
assert.NotNil(t, exitStatus, ex.jobStateHistory)
64+
assert.NotNil(t, exitStatus)
6565
assert.Equal(t, 100, *exitStatus)
6666
}
6767

@@ -96,7 +96,7 @@ func TestExecutor_LocalRepo(t *testing.T) {
9696

9797
err = ex.execJob(context.TODO(), io.Writer(&b))
9898
assert.NoError(t, err)
99-
assert.Equal(t, "bar\r\n", b.String())
99+
assert.Equal(t, "bar\n", strings.ReplaceAll(b.String(), "\r\n", "\n"))
100100
}
101101

102102
func TestExecutor_Recover(t *testing.T) {
@@ -148,8 +148,8 @@ func TestExecutor_RemoteRepo(t *testing.T) {
148148

149149
err = ex.execJob(context.TODO(), io.Writer(&b))
150150
assert.NoError(t, err)
151-
expected := fmt.Sprintf("%s\r\n%s\r\n%s\r\n", ex.getRepoData().RepoHash, ex.getRepoData().RepoConfigName, ex.getRepoData().RepoConfigEmail)
152-
assert.Equal(t, expected, b.String())
151+
expected := fmt.Sprintf("%s\n%s\n%s\n", ex.getRepoData().RepoHash, ex.getRepoData().RepoConfigName, ex.getRepoData().RepoConfigEmail)
152+
assert.Equal(t, expected, strings.ReplaceAll(b.String(), "\r\n", "\n"))
153153
}
154154

155155
/* Helpers */
@@ -236,3 +236,98 @@ func TestWriteDstackProfile(t *testing.T) {
236236
assert.Equal(t, value, string(out))
237237
}
238238
}
239+
240+
func TestExecutor_Logs(t *testing.T) {
241+
var b bytes.Buffer
242+
ex := makeTestExecutor(t)
243+
// Use printf to generate ANSI control codes.
244+
// \033[31m = red text, \033[1;32m = bold green text, \033[0m = reset
245+
ex.jobSpec.Commands = append(ex.jobSpec.Commands, "printf '\\033[31mRed Hello World\\033[0m\\n' && printf '\\033[1;32mBold Green Line 2\\033[0m\\n' && printf 'Line 3\\n'")
246+
247+
err := ex.execJob(context.TODO(), io.Writer(&b))
248+
assert.NoError(t, err)
249+
250+
logHistory := ex.GetHistory(0).JobLogs
251+
assert.NotEmpty(t, logHistory)
252+
253+
logString := combineLogMessages(logHistory)
254+
normalizedLogString := strings.ReplaceAll(logString, "\r\n", "\n")
255+
256+
expectedOutput := "Red Hello World\nBold Green Line 2\nLine 3\n"
257+
assert.Equal(t, expectedOutput, normalizedLogString, "Should strip ANSI codes from regular logs")
258+
259+
// Verify timestamps are in order
260+
assert.Greater(t, len(logHistory), 0)
261+
for i := 1; i < len(logHistory); i++ {
262+
assert.GreaterOrEqual(t, logHistory[i].Timestamp, logHistory[i-1].Timestamp)
263+
}
264+
}
265+
266+
func TestExecutor_LogsWithErrors(t *testing.T) {
267+
var b bytes.Buffer
268+
ex := makeTestExecutor(t)
269+
ex.jobSpec.Commands = append(ex.jobSpec.Commands, "echo 'Success message' && echo 'Error message' >&2 && exit 1")
270+
271+
err := ex.execJob(context.TODO(), io.Writer(&b))
272+
assert.Error(t, err)
273+
274+
logHistory := ex.GetHistory(0).JobLogs
275+
assert.NotEmpty(t, logHistory)
276+
277+
logString := combineLogMessages(logHistory)
278+
normalizedLogString := strings.ReplaceAll(logString, "\r\n", "\n")
279+
280+
expectedOutput := "Success message\nError message\n"
281+
assert.Equal(t, expectedOutput, normalizedLogString)
282+
}
283+
284+
func TestExecutor_LogsAnsiCodeHandling(t *testing.T) {
285+
var b bytes.Buffer
286+
ex := makeTestExecutor(t)
287+
288+
// Test a variety of ANSI escape sequences on stdout and stderr.
289+
cmd := "printf '\\033[31mRed\\033[0m \\033[32mGreen\\033[0m\\n' && " +
290+
"printf '\\033[1mBold\\033[0m \\033[4mUnderline\\033[0m\\n' && " +
291+
"printf '\\033[s\\033[uPlain text\\n' >&2"
292+
293+
ex.jobSpec.Commands = append(ex.jobSpec.Commands, cmd)
294+
295+
err := ex.execJob(context.TODO(), io.Writer(&b))
296+
assert.NoError(t, err)
297+
298+
// 1. Check WebSocket logs, which should preserve ANSI codes.
299+
wsLogHistory := ex.GetJobWsLogsHistory()
300+
assert.NotEmpty(t, wsLogHistory)
301+
wsLogString := combineLogMessages(wsLogHistory)
302+
normalizedWsLogString := strings.ReplaceAll(wsLogString, "\r\n", "\n")
303+
304+
expectedWsOutput := "\033[31mRed\033[0m \033[32mGreen\033[0m\n" +
305+
"\033[1mBold\033[0m \033[4mUnderline\033[0m\n" +
306+
"\033[s\033[uPlain text\n"
307+
assert.Equal(t, expectedWsOutput, normalizedWsLogString, "Websocket logs should preserve ANSI codes")
308+
309+
// 2. Check regular job logs, which should have ANSI codes stripped.
310+
regularLogHistory := ex.GetHistory(0).JobLogs
311+
assert.NotEmpty(t, regularLogHistory)
312+
regularLogString := combineLogMessages(regularLogHistory)
313+
normalizedRegularLogString := strings.ReplaceAll(regularLogString, "\r\n", "\n")
314+
315+
expectedRegularOutput := "Red Green\n" +
316+
"Bold Underline\n" +
317+
"Plain text\n"
318+
assert.Equal(t, expectedRegularOutput, normalizedRegularLogString, "Regular logs should have ANSI codes stripped")
319+
320+
// Verify timestamps are ordered for both log types.
321+
assert.Greater(t, len(wsLogHistory), 0)
322+
for i := 1; i < len(wsLogHistory); i++ {
323+
assert.GreaterOrEqual(t, wsLogHistory[i].Timestamp, wsLogHistory[i-1].Timestamp)
324+
}
325+
}
326+
327+
func combineLogMessages(logHistory []schemas.LogEvent) string {
328+
var logOutput bytes.Buffer
329+
for _, logEvent := range logHistory {
330+
logOutput.Write(logEvent.Message)
331+
}
332+
return logOutput.String()
333+
}

runner/internal/executor/query.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ import (
44
"github.com/dstackai/dstack/runner/internal/schemas"
55
)
66

7-
func (ex *RunExecutor) GetJobLogsHistory() []schemas.LogEvent {
8-
return ex.jobLogs.history
7+
func (ex *RunExecutor) GetJobWsLogsHistory() []schemas.LogEvent {
8+
return ex.jobWsLogs.history
99
}
1010

1111
func (ex *RunExecutor) GetHistory(timestamp int64) *schemas.PullResponse {

runner/internal/runner/api/ws.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,23 +34,23 @@ func (s *Server) streamJobLogs(conn *websocket.Conn) {
3434

3535
for {
3636
s.executor.RLock()
37-
jobLogsHistory := s.executor.GetJobLogsHistory()
37+
jobLogsWsHistory := s.executor.GetJobWsLogsHistory()
3838
select {
3939
case <-s.shutdownCh:
40-
if currentPos >= len(jobLogsHistory) {
40+
if currentPos >= len(jobLogsWsHistory) {
4141
s.executor.RUnlock()
4242
close(s.wsDoneCh)
4343
return
4444
}
4545
default:
46-
if currentPos >= len(jobLogsHistory) {
46+
if currentPos >= len(jobLogsWsHistory) {
4747
s.executor.RUnlock()
4848
time.Sleep(100 * time.Millisecond)
4949
continue
5050
}
5151
}
52-
for currentPos < len(jobLogsHistory) {
53-
if err := conn.WriteMessage(websocket.BinaryMessage, jobLogsHistory[currentPos].Message); err != nil {
52+
for currentPos < len(jobLogsWsHistory) {
53+
if err := conn.WriteMessage(websocket.BinaryMessage, jobLogsWsHistory[currentPos].Message); err != nil {
5454
s.executor.RUnlock()
5555
log.Error(context.TODO(), "Failed to write message", "err", err)
5656
return

src/dstack/_internal/server/services/logs/aws.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
from dstack._internal.server.services.logs.base import (
1818
LogStorage,
1919
LogStorageError,
20-
b64encode_raw_message,
2120
datetime_to_unix_time_ms,
2221
unix_time_ms_to_datetime,
2322
)
@@ -238,8 +237,7 @@ def _get_next_batch(
238237
skipped_future_events += 1
239238
continue
240239
cw_event = self._runner_log_event_to_cloudwatch_event(event)
241-
# as message is base64-encoded, length in bytes = length in code points.
242-
message_size = len(cw_event["message"]) + self.MESSAGE_OVERHEAD_SIZE
240+
message_size = len(event.message) + self.MESSAGE_OVERHEAD_SIZE
243241
if message_size > self.MESSAGE_MAX_SIZE:
244242
# we should never hit this limit, as we use `io.Copy` to copy from pty to logs,
245243
# which under the hood uses 32KiB buffer, see runner/internal/executor/executor.go,
@@ -271,7 +269,7 @@ def _runner_log_event_to_cloudwatch_event(
271269
) -> _CloudWatchLogEvent:
272270
return {
273271
"timestamp": runner_log_event.timestamp,
274-
"message": b64encode_raw_message(runner_log_event.message),
272+
"message": runner_log_event.message.decode(),
275273
}
276274

277275
@contextmanager

src/dstack/_internal/server/services/logs/filelog.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
from dstack._internal.server.services.logs.base import (
1616
LogStorage,
1717
LogStorageError,
18-
b64encode_raw_message,
1918
unix_time_ms_to_datetime,
2019
)
2120

@@ -140,5 +139,5 @@ def _runner_log_event_to_log_event(self, runner_log_event: RunnerLogEvent) -> Lo
140139
return LogEvent(
141140
timestamp=unix_time_ms_to_datetime(runner_log_event.timestamp),
142141
log_source=LogEventSource.STDOUT,
143-
message=b64encode_raw_message(runner_log_event.message),
142+
message=runner_log_event.message.decode(),
144143
)

0 commit comments

Comments
 (0)