diff --git a/agent/app/service/ai.go b/agent/app/service/ai.go index 07d7912ffb80..9c9d68ba3593 100644 --- a/agent/app/service/ai.go +++ b/agent/app/service/ai.go @@ -1,11 +1,14 @@ package service import ( + "bytes" "context" "fmt" "os" + "os/exec" "path" "strings" + "syscall" "time" "github.com/1Panel-dev/1Panel/agent/app/dto" @@ -19,6 +22,7 @@ import ( "github.com/1Panel-dev/1Panel/agent/i18n" "github.com/1Panel-dev/1Panel/agent/utils/cmd" "github.com/1Panel-dev/1Panel/agent/utils/common" + "github.com/1Panel-dev/1Panel/agent/utils/re" "github.com/jinzhu/copier" ) @@ -107,8 +111,7 @@ func (u *AIToolService) Create(req dto.OllamaModelName) error { } go func() { taskItem.AddSubTask(i18n.GetWithName("OllamaModelPull", req.Name), func(t *task.Task) error { - cmdMgr := cmd.NewCommandMgr(cmd.WithTask(*taskItem), cmd.WithTimeout(time.Hour)) - return cmdMgr.Run("docker", "exec", containerName, "ollama", "pull", info.Name) + return runOllamaPullWithProcess(t, containerName, info.Name) }, nil) taskItem.AddSubTask(i18n.GetWithName("OllamaModelSize", req.Name), func(t *task.Task) error { itemSize, err := loadModelSize(info.Name, containerName) @@ -162,8 +165,7 @@ func (u *AIToolService) Recreate(req dto.OllamaModelName) error { } go func() { taskItem.AddSubTask(i18n.GetWithName("OllamaModelPull", req.Name), func(t *task.Task) error { - cmdMgr := cmd.NewCommandMgr(cmd.WithTask(*taskItem), cmd.WithTimeout(time.Hour)) - return cmdMgr.Run("docker", "exec", containerName, "ollama", "pull", req.Name) + return runOllamaPullWithProcess(t, containerName, req.Name) }, nil) taskItem.AddSubTask(i18n.GetWithName("OllamaModelSize", req.Name), func(t *task.Task) error { itemSize, err := loadModelSize(modelInfo.Name, containerName) @@ -394,3 +396,127 @@ func loadModelSize(name string, containerName string) (string, error) { } return "", fmt.Errorf("no such model %s in ollama list, std: %s", name, stdout) } + +func runOllamaPullWithProcess(taskItem *task.Task, containerName, modelName string) error { + ctx, cancel := context.WithTimeout(context.Background(), time.Hour) + defer cancel() + + cmdItem := exec.CommandContext(ctx, "docker", "exec", containerName, "ollama", "pull", modelName) + cmdItem.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + writer := &ollamaPullLogWriter{taskItem: taskItem} + cmdItem.Stdout = writer + cmdItem.Stderr = writer + + if err := cmdItem.Start(); err != nil { + return fmt.Errorf("failed to start ollama pull %s: %w", modelName, err) + } + waitErr := cmdItem.Wait() + writer.Flush() + if ctx.Err() == context.DeadlineExceeded { + if cmdItem.Process != nil && cmdItem.Process.Pid > 0 { + _ = syscall.Kill(-cmdItem.Process.Pid, syscall.SIGKILL) + } + return buserr.New("ErrCmdTimeout") + } + if waitErr == nil { + return nil + } + if len(strings.TrimSpace(writer.errBuf.String())) > 0 { + return fmt.Errorf("%s", strings.TrimSpace(writer.errBuf.String())) + } + return waitErr +} + +type ollamaPullLogWriter struct { + taskItem *task.Task + errBuf bytes.Buffer +} + +func (w *ollamaPullLogWriter) Write(p []byte) (n int, err error) { + w.errBuf.Write(p) + for _, segment := range splitOllamaPullSegments(string(p)) { + w.logLine(segment) + } + return len(p), nil +} + +func (w *ollamaPullLogWriter) Flush() {} + +func (w *ollamaPullLogWriter) logLine(line string) { + if w == nil || w.taskItem == nil { + return + } + line = sanitizeOllamaPullLine(line) + if line == "" { + return + } + if prefix := resolveOllamaPullLogPrefix(line); prefix != "" { + _ = replaceOllamaPullLogLine(prefix, line, w.taskItem) + return + } + w.taskItem.Log(line) +} + +func sanitizeOllamaPullLine(line string) string { + line = re.StripAnsiControlSeq(line) + line = strings.TrimSpace(line) + if strings.HasPrefix(line, "pulling manifes") { + return "" + } + return line +} + +func splitOllamaPullSegments(chunk string) []string { + if chunk == "" { + return nil + } + chunk = strings.ReplaceAll(chunk, "\r", "\n") + parts := strings.Split(chunk, "\x1b[1G") + segments := make([]string, 0, len(parts)) + for _, part := range parts { + for _, line := range strings.Split(part, "\n") { + line = strings.TrimSpace(line) + if line == "" { + continue + } + segments = append(segments, line) + } + } + return segments +} + +func resolveOllamaPullLogPrefix(line string) string { + if strings.HasPrefix(line, "pulling ") { + if idx := strings.Index(line, ":"); idx > 0 { + return strings.TrimSpace(line[:idx]) + } + fields := strings.Fields(line) + if len(fields) >= 2 { + return strings.TrimSpace(fields[0] + " " + fields[1]) + } + } + return "" +} + +func replaceOllamaPullLogLine(prefix, newLine string, taskItem *task.Task) error { + if taskItem == nil || taskItem.Task == nil { + return nil + } + data, err := os.ReadFile(taskItem.Task.LogFile) + if err != nil { + return err + } + lines := strings.Split(string(data), "\n") + for idx, line := range lines { + trimmed := strings.TrimSpace(line) + if trimmed == "" { + continue + } + if strings.Contains(trimmed, prefix) { + lines[idx] = time.Now().Format("2006/01/02 15:04:05") + " " + newLine + return os.WriteFile(taskItem.Task.LogFile, []byte(strings.Join(lines, "\n")), os.ModePerm) + } + } + taskItem.Log(newLine) + return nil +} diff --git a/agent/utils/re/re.go b/agent/utils/re/re.go index c781c183fa63..6b008d27022d 100644 --- a/agent/utils/re/re.go +++ b/agent/utils/re/re.go @@ -27,6 +27,7 @@ const ( DurationWithOptionalUnitPattern = `^(\d+)([smhdw]?)$` MysqlGroupPattern = `\[*\]` AnsiEscapePattern = "\x1b\\[[0-9;?]*[A-Za-z]|\x1b=|\x1b>" + AnsiControlSeqPattern = `\x1b\[[0-9;?]*[ -/]*[@-~]` RecycleBinFilePattern = `_1p_file_1p_(.+)_p_(\d+)_(\d+)` OrderByValidationPattern = `^[a-zA-Z_][a-zA-Z0-9_]*$` SQLIdentifierPattern = `^[A-Za-z_][A-Za-z0-9_]*$` @@ -59,6 +60,7 @@ func Init() { DurationWithOptionalUnitPattern, MysqlGroupPattern, AnsiEscapePattern, + AnsiControlSeqPattern, RecycleBinFilePattern, OrderByValidationPattern, SQLIdentifierPattern, @@ -82,3 +84,7 @@ func GetRegex(pattern string) *regexp.Regexp { func RegisterRegex(pattern string) { regexMap[pattern] = regexp.MustCompile(pattern) } + +func StripAnsiControlSeq(value string) string { + return GetRegex(AnsiControlSeqPattern).ReplaceAllString(value, "") +}