Skip to content

Commit 3c9a9d5

Browse files
[Feature] Strip ANSI codes from run logs and store them as plain text instead of bytes (#2876)
1 parent da5c0cc commit 3c9a9d5

File tree

16 files changed

+253
-166
lines changed

16 files changed

+253
-166
lines changed

frontend/src/libs/index.ts

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -91,15 +91,6 @@ export const riseRouterException = (status = 404, json = 'Not Found'): never =>
9191
throw new Response(json, { status });
9292
};
9393

94-
export const base64ToArrayBuffer = (base64: string) => {
95-
const binaryString = atob(base64);
96-
const bytes = new Uint8Array(binaryString.length);
97-
for (let i = 0; i < binaryString.length; i++) {
98-
bytes[i] = binaryString.charCodeAt(i);
99-
}
100-
return bytes;
101-
};
102-
10394
export const isValidUrl = (urlString: string) => {
10495
try {
10596
return Boolean(new URL(urlString));

frontend/src/pages/Runs/Details/Logs/index.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ export const Logs: React.FC<IProps> = ({ className, projectName, runName, jobSub
3131

3232
const writeDataToTerminal = (logs: ILogItem[]) => {
3333
logs.forEach((logItem) => {
34-
terminalInstance.current.write(logItem.message);
34+
terminalInstance.current.write(logItem.message.replace(/(?<!\r)\n/g, '\r\n'));
3535
});
3636

3737
fitAddonInstance.current.fit();

frontend/src/services/project.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { API } from 'api';
22
import { createApi, fetchBaseQuery } from '@reduxjs/toolkit/query/react';
33

4-
import { base64ToArrayBuffer } from 'libs';
54
import fetchBaseQueryHeaders from 'libs/fetchBaseQueryHeaders';
65

76
// Helper function to transform backend response to frontend format
@@ -131,7 +130,7 @@ export const projectApi = createApi({
131130
transformResponse: (response: { logs: ILogItem[]; next_token: string }) => {
132131
const logs = response.logs.map((logItem) => ({
133132
...logItem,
134-
message: base64ToArrayBuffer(logItem.message as string),
133+
message: logItem.message,
135134
}));
136135

137136
return {

frontend/src/types/log.d.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
declare interface ILogItem {
22
log_source: 'stdout' | 'stderr';
33
timestamp: string;
4-
message: string | Uint8Array;
4+
message: string;
55
}
66

77
declare type TRequestLogsParams = {

runner/go.mod

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/dstackai/dstack/runner
22

3-
go 1.23
3+
go 1.23.8
44

55
require (
66
github.com/alexellis/go-execute/v2 v2.2.1
@@ -10,6 +10,7 @@ require (
1010
github.com/docker/docker v26.0.0+incompatible
1111
github.com/docker/go-connections v0.5.0
1212
github.com/docker/go-units v0.5.0
13+
github.com/dstackai/ansistrip v0.0.6
1314
github.com/go-git/go-git/v5 v5.12.0
1415
github.com/golang/gddo v0.0.0-20210115222349-20d68f94ee1f
1516
github.com/gorilla/websocket v1.5.1
@@ -62,6 +63,7 @@ require (
6263
github.com/russross/blackfriday/v2 v2.1.0 // indirect
6364
github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3 // indirect
6465
github.com/skeema/knownhosts v1.2.2 // indirect
66+
github.com/tidwall/btree v1.7.0 // indirect
6567
github.com/tklauser/go-sysconf v0.3.12 // indirect
6668
github.com/tklauser/numcpus v0.6.1 // indirect
6769
github.com/ulikunitz/xz v0.5.12 // indirect

runner/go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj
4747
github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc=
4848
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
4949
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
50+
github.com/dstackai/ansistrip v0.0.6 h1:6qqeDNWt8NoqfkY1CxKUvdHpJzBl89LOE3wMwptVpaI=
51+
github.com/dstackai/ansistrip v0.0.6/go.mod h1:w3ejXI0twxDv6bPXhkOaPeYdbwz2nwcrcvFoZGqi9F0=
5052
github.com/ebitengine/purego v0.8.1 h1:sdRKd6plj7KYW33EH5As6YKfe8m9zbN9JMrOjNVF/BE=
5153
github.com/ebitengine/purego v0.8.1/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ=
5254
github.com/elazarl/goproxy v0.0.0-20230808193330-2592e75ae04a h1:mATvB/9r/3gvcejNsXKSkQ6lcIaNec2nyfOdlTBR2lU=
@@ -171,6 +173,8 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
171173
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
172174
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
173175
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
176+
github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI=
177+
github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY=
174178
github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU=
175179
github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI=
176180
github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk=

runner/internal/executor/base.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010

1111
type Executor interface {
1212
GetHistory(timestamp int64) *schemas.PullResponse
13-
GetJobLogsHistory() []schemas.LogEvent
13+
GetJobWsLogsHistory() []schemas.LogEvent
1414
GetRunnerState() string
1515
Run(ctx context.Context) error
1616
SetCodePath(codePath string)

runner/internal/executor/executor.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"time"
2020

2121
"github.com/creack/pty"
22+
"github.com/dstackai/ansistrip"
2223
"github.com/dstackai/dstack/runner/consts"
2324
"github.com/dstackai/dstack/runner/internal/connections"
2425
"github.com/dstackai/dstack/runner/internal/gerrors"
@@ -28,6 +29,18 @@ import (
2829
"github.com/prometheus/procfs"
2930
)
3031

32+
// TODO: Tune these parameters for optimal experience/performance
33+
const (
34+
// Output is flushed when the cursor doesn't move for this duration
35+
AnsiStripFlushInterval = 500 * time.Millisecond
36+
37+
// Output is flushed regardless of cursor activity after this maximum delay
38+
AnsiStripMaxDelay = 3 * time.Second
39+
40+
// Maximum buffer size for ansistrip
41+
MaxBufferSize = 32 * 1024 // 32KB
42+
)
43+
3144
type ConnectionTracker interface {
3245
GetNoConnectionsSecs() int64
3346
Track(ticker <-chan time.Time)
@@ -54,6 +67,7 @@ type RunExecutor struct {
5467
state string
5568
jobStateHistory []schemas.JobStateEvent
5669
jobLogs *appendWriter
70+
jobWsLogs *appendWriter
5771
runnerLogs *appendWriter
5872
timestamp *MonotonicTimestamp
5973

@@ -110,6 +124,7 @@ func NewRunExecutor(tempDir string, homeDir string, workingDir string, sshPort i
110124
state: WaitSubmit,
111125
jobStateHistory: make([]schemas.JobStateEvent, 0),
112126
jobLogs: newAppendWriter(mu, timestamp),
127+
jobWsLogs: newAppendWriter(mu, timestamp),
113128
runnerLogs: newAppendWriter(mu, timestamp),
114129
timestamp: timestamp,
115130

@@ -153,7 +168,9 @@ func (ex *RunExecutor) Run(ctx context.Context) (err error) {
153168
}
154169
}()
155170

156-
logger := io.MultiWriter(runnerLogFile, os.Stdout, ex.runnerLogs)
171+
stripper := ansistrip.NewWriter(ex.runnerLogs, AnsiStripFlushInterval, AnsiStripMaxDelay, MaxBufferSize)
172+
defer stripper.Close()
173+
logger := io.MultiWriter(runnerLogFile, os.Stdout, stripper)
157174
ctx = log.WithLogger(ctx, log.NewEntry(logger, int(log.DefaultEntry.Logger.Level))) // todo loglevel
158175
log.Info(ctx, "Run job", "log_level", log.GetLogger(ctx).Logger.Level.String())
159176

@@ -455,7 +472,9 @@ func (ex *RunExecutor) execJob(ctx context.Context, jobLogFile io.Writer) error
455472
defer func() { _ = ptm.Close() }()
456473
defer func() { _ = cmd.Wait() }() // release resources if copy fails
457474

458-
logger := io.MultiWriter(jobLogFile, ex.jobLogs)
475+
stripper := ansistrip.NewWriter(ex.jobLogs, AnsiStripFlushInterval, AnsiStripMaxDelay, MaxBufferSize)
476+
defer stripper.Close()
477+
logger := io.MultiWriter(jobLogFile, ex.jobWsLogs, stripper)
459478
_, err = io.Copy(logger, ptm)
460479
if err != nil && !isPtyError(err) {
461480
return gerrors.Wrap(err)

runner/internal/executor/executor_test.go

Lines changed: 104 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"os"
1010
"os/exec"
1111
"path/filepath"
12+
"strings"
1213
"testing"
1314
"time"
1415

@@ -17,8 +18,6 @@ import (
1718
"github.com/stretchr/testify/require"
1819
)
1920

20-
// todo test get history
21-
2221
func TestExecutor_WorkingDir_Current(t *testing.T) {
2322
var b bytes.Buffer
2423
ex := makeTestExecutor(t)
@@ -28,7 +27,8 @@ func TestExecutor_WorkingDir_Current(t *testing.T) {
2827

2928
err := ex.execJob(context.TODO(), io.Writer(&b))
3029
assert.NoError(t, err)
31-
assert.Equal(t, ex.workingDir+"\r\n", b.String())
30+
// Normalize line endings for cross-platform compatibility.
31+
assert.Equal(t, ex.workingDir+"\n", strings.ReplaceAll(b.String(), "\r\n", "\n"))
3232
}
3333

3434
func TestExecutor_WorkingDir_Nil(t *testing.T) {
@@ -39,7 +39,7 @@ func TestExecutor_WorkingDir_Nil(t *testing.T) {
3939

4040
err := ex.execJob(context.TODO(), io.Writer(&b))
4141
assert.NoError(t, err)
42-
assert.Equal(t, ex.workingDir+"\r\n", b.String())
42+
assert.Equal(t, ex.workingDir+"\n", strings.ReplaceAll(b.String(), "\r\n", "\n"))
4343
}
4444

4545
func TestExecutor_HomeDir(t *testing.T) {
@@ -49,7 +49,7 @@ func TestExecutor_HomeDir(t *testing.T) {
4949

5050
err := ex.execJob(context.TODO(), io.Writer(&b))
5151
assert.NoError(t, err)
52-
assert.Equal(t, ex.homeDir+"\r\n", b.String())
52+
assert.Equal(t, ex.homeDir+"\n", strings.ReplaceAll(b.String(), "\r\n", "\n"))
5353
}
5454

5555
func TestExecutor_NonZeroExit(t *testing.T) {
@@ -61,7 +61,7 @@ func TestExecutor_NonZeroExit(t *testing.T) {
6161
assert.Error(t, err)
6262
assert.NotEmpty(t, ex.jobStateHistory)
6363
exitStatus := ex.jobStateHistory[len(ex.jobStateHistory)-1].ExitStatus
64-
assert.NotNil(t, exitStatus, ex.jobStateHistory)
64+
assert.NotNil(t, exitStatus)
6565
assert.Equal(t, 100, *exitStatus)
6666
}
6767

@@ -96,7 +96,7 @@ func TestExecutor_LocalRepo(t *testing.T) {
9696

9797
err = ex.execJob(context.TODO(), io.Writer(&b))
9898
assert.NoError(t, err)
99-
assert.Equal(t, "bar\r\n", b.String())
99+
assert.Equal(t, "bar\n", strings.ReplaceAll(b.String(), "\r\n", "\n"))
100100
}
101101

102102
func TestExecutor_Recover(t *testing.T) {
@@ -148,8 +148,8 @@ func TestExecutor_RemoteRepo(t *testing.T) {
148148

149149
err = ex.execJob(context.TODO(), io.Writer(&b))
150150
assert.NoError(t, err)
151-
expected := fmt.Sprintf("%s\r\n%s\r\n%s\r\n", ex.getRepoData().RepoHash, ex.getRepoData().RepoConfigName, ex.getRepoData().RepoConfigEmail)
152-
assert.Equal(t, expected, b.String())
151+
expected := fmt.Sprintf("%s\n%s\n%s\n", ex.getRepoData().RepoHash, ex.getRepoData().RepoConfigName, ex.getRepoData().RepoConfigEmail)
152+
assert.Equal(t, expected, strings.ReplaceAll(b.String(), "\r\n", "\n"))
153153
}
154154

155155
/* Helpers */
@@ -236,3 +236,98 @@ func TestWriteDstackProfile(t *testing.T) {
236236
assert.Equal(t, value, string(out))
237237
}
238238
}
239+
240+
func TestExecutor_Logs(t *testing.T) {
241+
var b bytes.Buffer
242+
ex := makeTestExecutor(t)
243+
// Use printf to generate ANSI control codes.
244+
// \033[31m = red text, \033[1;32m = bold green text, \033[0m = reset
245+
ex.jobSpec.Commands = append(ex.jobSpec.Commands, "printf '\\033[31mRed Hello World\\033[0m\\n' && printf '\\033[1;32mBold Green Line 2\\033[0m\\n' && printf 'Line 3\\n'")
246+
247+
err := ex.execJob(context.TODO(), io.Writer(&b))
248+
assert.NoError(t, err)
249+
250+
logHistory := ex.GetHistory(0).JobLogs
251+
assert.NotEmpty(t, logHistory)
252+
253+
logString := combineLogMessages(logHistory)
254+
normalizedLogString := strings.ReplaceAll(logString, "\r\n", "\n")
255+
256+
expectedOutput := "Red Hello World\nBold Green Line 2\nLine 3\n"
257+
assert.Equal(t, expectedOutput, normalizedLogString, "Should strip ANSI codes from regular logs")
258+
259+
// Verify timestamps are in order
260+
assert.Greater(t, len(logHistory), 0)
261+
for i := 1; i < len(logHistory); i++ {
262+
assert.GreaterOrEqual(t, logHistory[i].Timestamp, logHistory[i-1].Timestamp)
263+
}
264+
}
265+
266+
func TestExecutor_LogsWithErrors(t *testing.T) {
267+
var b bytes.Buffer
268+
ex := makeTestExecutor(t)
269+
ex.jobSpec.Commands = append(ex.jobSpec.Commands, "echo 'Success message' && echo 'Error message' >&2 && exit 1")
270+
271+
err := ex.execJob(context.TODO(), io.Writer(&b))
272+
assert.Error(t, err)
273+
274+
logHistory := ex.GetHistory(0).JobLogs
275+
assert.NotEmpty(t, logHistory)
276+
277+
logString := combineLogMessages(logHistory)
278+
normalizedLogString := strings.ReplaceAll(logString, "\r\n", "\n")
279+
280+
expectedOutput := "Success message\nError message\n"
281+
assert.Equal(t, expectedOutput, normalizedLogString)
282+
}
283+
284+
func TestExecutor_LogsAnsiCodeHandling(t *testing.T) {
285+
var b bytes.Buffer
286+
ex := makeTestExecutor(t)
287+
288+
// Test a variety of ANSI escape sequences on stdout and stderr.
289+
cmd := "printf '\\033[31mRed\\033[0m \\033[32mGreen\\033[0m\\n' && " +
290+
"printf '\\033[1mBold\\033[0m \\033[4mUnderline\\033[0m\\n' && " +
291+
"printf '\\033[s\\033[uPlain text\\n' >&2"
292+
293+
ex.jobSpec.Commands = append(ex.jobSpec.Commands, cmd)
294+
295+
err := ex.execJob(context.TODO(), io.Writer(&b))
296+
assert.NoError(t, err)
297+
298+
// 1. Check WebSocket logs, which should preserve ANSI codes.
299+
wsLogHistory := ex.GetJobWsLogsHistory()
300+
assert.NotEmpty(t, wsLogHistory)
301+
wsLogString := combineLogMessages(wsLogHistory)
302+
normalizedWsLogString := strings.ReplaceAll(wsLogString, "\r\n", "\n")
303+
304+
expectedWsOutput := "\033[31mRed\033[0m \033[32mGreen\033[0m\n" +
305+
"\033[1mBold\033[0m \033[4mUnderline\033[0m\n" +
306+
"\033[s\033[uPlain text\n"
307+
assert.Equal(t, expectedWsOutput, normalizedWsLogString, "Websocket logs should preserve ANSI codes")
308+
309+
// 2. Check regular job logs, which should have ANSI codes stripped.
310+
regularLogHistory := ex.GetHistory(0).JobLogs
311+
assert.NotEmpty(t, regularLogHistory)
312+
regularLogString := combineLogMessages(regularLogHistory)
313+
normalizedRegularLogString := strings.ReplaceAll(regularLogString, "\r\n", "\n")
314+
315+
expectedRegularOutput := "Red Green\n" +
316+
"Bold Underline\n" +
317+
"Plain text\n"
318+
assert.Equal(t, expectedRegularOutput, normalizedRegularLogString, "Regular logs should have ANSI codes stripped")
319+
320+
// Verify timestamps are ordered for both log types.
321+
assert.Greater(t, len(wsLogHistory), 0)
322+
for i := 1; i < len(wsLogHistory); i++ {
323+
assert.GreaterOrEqual(t, wsLogHistory[i].Timestamp, wsLogHistory[i-1].Timestamp)
324+
}
325+
}
326+
327+
func combineLogMessages(logHistory []schemas.LogEvent) string {
328+
var logOutput bytes.Buffer
329+
for _, logEvent := range logHistory {
330+
logOutput.Write(logEvent.Message)
331+
}
332+
return logOutput.String()
333+
}

runner/internal/executor/query.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ import (
44
"github.com/dstackai/dstack/runner/internal/schemas"
55
)
66

7-
func (ex *RunExecutor) GetJobLogsHistory() []schemas.LogEvent {
8-
return ex.jobLogs.history
7+
func (ex *RunExecutor) GetJobWsLogsHistory() []schemas.LogEvent {
8+
return ex.jobWsLogs.history
99
}
1010

1111
func (ex *RunExecutor) GetHistory(timestamp int64) *schemas.PullResponse {

0 commit comments

Comments
 (0)