Skip to content

Commit a40786a

Browse files
authored
refactor: clean up ollama pull log parsing (#12414)
1 parent 368193a commit a40786a

File tree

2 files changed

+136
-4
lines changed

2 files changed

+136
-4
lines changed

agent/app/service/ai.go

Lines changed: 130 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
package service
22

33
import (
4+
"bytes"
45
"context"
56
"fmt"
67
"os"
8+
"os/exec"
79
"path"
810
"strings"
11+
"syscall"
912
"time"
1013

1114
"github.com/1Panel-dev/1Panel/agent/app/dto"
@@ -19,6 +22,7 @@ import (
1922
"github.com/1Panel-dev/1Panel/agent/i18n"
2023
"github.com/1Panel-dev/1Panel/agent/utils/cmd"
2124
"github.com/1Panel-dev/1Panel/agent/utils/common"
25+
"github.com/1Panel-dev/1Panel/agent/utils/re"
2226
"github.com/jinzhu/copier"
2327
)
2428

@@ -107,8 +111,7 @@ func (u *AIToolService) Create(req dto.OllamaModelName) error {
107111
}
108112
go func() {
109113
taskItem.AddSubTask(i18n.GetWithName("OllamaModelPull", req.Name), func(t *task.Task) error {
110-
cmdMgr := cmd.NewCommandMgr(cmd.WithTask(*taskItem), cmd.WithTimeout(time.Hour))
111-
return cmdMgr.Run("docker", "exec", containerName, "ollama", "pull", info.Name)
114+
return runOllamaPullWithProcess(t, containerName, info.Name)
112115
}, nil)
113116
taskItem.AddSubTask(i18n.GetWithName("OllamaModelSize", req.Name), func(t *task.Task) error {
114117
itemSize, err := loadModelSize(info.Name, containerName)
@@ -162,8 +165,7 @@ func (u *AIToolService) Recreate(req dto.OllamaModelName) error {
162165
}
163166
go func() {
164167
taskItem.AddSubTask(i18n.GetWithName("OllamaModelPull", req.Name), func(t *task.Task) error {
165-
cmdMgr := cmd.NewCommandMgr(cmd.WithTask(*taskItem), cmd.WithTimeout(time.Hour))
166-
return cmdMgr.Run("docker", "exec", containerName, "ollama", "pull", req.Name)
168+
return runOllamaPullWithProcess(t, containerName, req.Name)
167169
}, nil)
168170
taskItem.AddSubTask(i18n.GetWithName("OllamaModelSize", req.Name), func(t *task.Task) error {
169171
itemSize, err := loadModelSize(modelInfo.Name, containerName)
@@ -394,3 +396,127 @@ func loadModelSize(name string, containerName string) (string, error) {
394396
}
395397
return "", fmt.Errorf("no such model %s in ollama list, std: %s", name, stdout)
396398
}
399+
400+
func runOllamaPullWithProcess(taskItem *task.Task, containerName, modelName string) error {
401+
ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
402+
defer cancel()
403+
404+
cmdItem := exec.CommandContext(ctx, "docker", "exec", containerName, "ollama", "pull", modelName)
405+
cmdItem.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
406+
writer := &ollamaPullLogWriter{taskItem: taskItem}
407+
cmdItem.Stdout = writer
408+
cmdItem.Stderr = writer
409+
410+
if err := cmdItem.Start(); err != nil {
411+
return fmt.Errorf("failed to start ollama pull %s: %w", modelName, err)
412+
}
413+
waitErr := cmdItem.Wait()
414+
writer.Flush()
415+
if ctx.Err() == context.DeadlineExceeded {
416+
if cmdItem.Process != nil && cmdItem.Process.Pid > 0 {
417+
_ = syscall.Kill(-cmdItem.Process.Pid, syscall.SIGKILL)
418+
}
419+
return buserr.New("ErrCmdTimeout")
420+
}
421+
if waitErr == nil {
422+
return nil
423+
}
424+
if len(strings.TrimSpace(writer.errBuf.String())) > 0 {
425+
return fmt.Errorf("%s", strings.TrimSpace(writer.errBuf.String()))
426+
}
427+
return waitErr
428+
}
429+
430+
type ollamaPullLogWriter struct {
431+
taskItem *task.Task
432+
errBuf bytes.Buffer
433+
}
434+
435+
func (w *ollamaPullLogWriter) Write(p []byte) (n int, err error) {
436+
w.errBuf.Write(p)
437+
for _, segment := range splitOllamaPullSegments(string(p)) {
438+
w.logLine(segment)
439+
}
440+
return len(p), nil
441+
}
442+
443+
func (w *ollamaPullLogWriter) Flush() {}
444+
445+
func (w *ollamaPullLogWriter) logLine(line string) {
446+
if w == nil || w.taskItem == nil {
447+
return
448+
}
449+
line = sanitizeOllamaPullLine(line)
450+
if line == "" {
451+
return
452+
}
453+
if prefix := resolveOllamaPullLogPrefix(line); prefix != "" {
454+
_ = replaceOllamaPullLogLine(prefix, line, w.taskItem)
455+
return
456+
}
457+
w.taskItem.Log(line)
458+
}
459+
460+
func sanitizeOllamaPullLine(line string) string {
461+
line = re.StripAnsiControlSeq(line)
462+
line = strings.TrimSpace(line)
463+
if strings.HasPrefix(line, "pulling manifes") {
464+
return ""
465+
}
466+
return line
467+
}
468+
469+
func splitOllamaPullSegments(chunk string) []string {
470+
if chunk == "" {
471+
return nil
472+
}
473+
chunk = strings.ReplaceAll(chunk, "\r", "\n")
474+
parts := strings.Split(chunk, "\x1b[1G")
475+
segments := make([]string, 0, len(parts))
476+
for _, part := range parts {
477+
for _, line := range strings.Split(part, "\n") {
478+
line = strings.TrimSpace(line)
479+
if line == "" {
480+
continue
481+
}
482+
segments = append(segments, line)
483+
}
484+
}
485+
return segments
486+
}
487+
488+
func resolveOllamaPullLogPrefix(line string) string {
489+
if strings.HasPrefix(line, "pulling ") {
490+
if idx := strings.Index(line, ":"); idx > 0 {
491+
return strings.TrimSpace(line[:idx])
492+
}
493+
fields := strings.Fields(line)
494+
if len(fields) >= 2 {
495+
return strings.TrimSpace(fields[0] + " " + fields[1])
496+
}
497+
}
498+
return ""
499+
}
500+
501+
func replaceOllamaPullLogLine(prefix, newLine string, taskItem *task.Task) error {
502+
if taskItem == nil || taskItem.Task == nil {
503+
return nil
504+
}
505+
data, err := os.ReadFile(taskItem.Task.LogFile)
506+
if err != nil {
507+
return err
508+
}
509+
lines := strings.Split(string(data), "\n")
510+
for idx, line := range lines {
511+
trimmed := strings.TrimSpace(line)
512+
if trimmed == "" {
513+
continue
514+
}
515+
if strings.Contains(trimmed, prefix) {
516+
lines[idx] = time.Now().Format("2006/01/02 15:04:05") + " " + newLine
517+
return os.WriteFile(taskItem.Task.LogFile, []byte(strings.Join(lines, "\n")), os.ModePerm)
518+
}
519+
}
520+
taskItem.Log(newLine)
521+
return nil
522+
}

agent/utils/re/re.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ const (
2727
DurationWithOptionalUnitPattern = `^(\d+)([smhdw]?)$`
2828
MysqlGroupPattern = `\[*\]`
2929
AnsiEscapePattern = "\x1b\\[[0-9;?]*[A-Za-z]|\x1b=|\x1b>"
30+
AnsiControlSeqPattern = `\x1b\[[0-9;?]*[ -/]*[@-~]`
3031
RecycleBinFilePattern = `_1p_file_1p_(.+)_p_(\d+)_(\d+)`
3132
OrderByValidationPattern = `^[a-zA-Z_][a-zA-Z0-9_]*$`
3233
SQLIdentifierPattern = `^[A-Za-z_][A-Za-z0-9_]*$`
@@ -59,6 +60,7 @@ func Init() {
5960
DurationWithOptionalUnitPattern,
6061
MysqlGroupPattern,
6162
AnsiEscapePattern,
63+
AnsiControlSeqPattern,
6264
RecycleBinFilePattern,
6365
OrderByValidationPattern,
6466
SQLIdentifierPattern,
@@ -82,3 +84,7 @@ func GetRegex(pattern string) *regexp.Regexp {
8284
func RegisterRegex(pattern string) {
8385
regexMap[pattern] = regexp.MustCompile(pattern)
8486
}
87+
88+
func StripAnsiControlSeq(value string) string {
89+
return GetRegex(AnsiControlSeqPattern).ReplaceAllString(value, "")
90+
}

0 commit comments

Comments
 (0)