Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 130 additions & 4 deletions agent/app/service/ai.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package service

import (
"bytes"
"context"
"fmt"
"os"
"os/exec"
"path"
"strings"
"syscall"
"time"

"github.com/1Panel-dev/1Panel/agent/app/dto"
Expand All @@ -19,6 +22,7 @@ import (
"github.com/1Panel-dev/1Panel/agent/i18n"
"github.com/1Panel-dev/1Panel/agent/utils/cmd"
"github.com/1Panel-dev/1Panel/agent/utils/common"
"github.com/1Panel-dev/1Panel/agent/utils/re"
"github.com/jinzhu/copier"
)

Expand Down Expand Up @@ -107,8 +111,7 @@ func (u *AIToolService) Create(req dto.OllamaModelName) error {
}
go func() {
taskItem.AddSubTask(i18n.GetWithName("OllamaModelPull", req.Name), func(t *task.Task) error {
cmdMgr := cmd.NewCommandMgr(cmd.WithTask(*taskItem), cmd.WithTimeout(time.Hour))
return cmdMgr.Run("docker", "exec", containerName, "ollama", "pull", info.Name)
return runOllamaPullWithProcess(t, containerName, info.Name)
}, nil)
taskItem.AddSubTask(i18n.GetWithName("OllamaModelSize", req.Name), func(t *task.Task) error {
itemSize, err := loadModelSize(info.Name, containerName)
Expand Down Expand Up @@ -162,8 +165,7 @@ func (u *AIToolService) Recreate(req dto.OllamaModelName) error {
}
go func() {
taskItem.AddSubTask(i18n.GetWithName("OllamaModelPull", req.Name), func(t *task.Task) error {
cmdMgr := cmd.NewCommandMgr(cmd.WithTask(*taskItem), cmd.WithTimeout(time.Hour))
return cmdMgr.Run("docker", "exec", containerName, "ollama", "pull", req.Name)
return runOllamaPullWithProcess(t, containerName, req.Name)
}, nil)
taskItem.AddSubTask(i18n.GetWithName("OllamaModelSize", req.Name), func(t *task.Task) error {
itemSize, err := loadModelSize(modelInfo.Name, containerName)
Expand Down Expand Up @@ -394,3 +396,127 @@ func loadModelSize(name string, containerName string) (string, error) {
}
return "", fmt.Errorf("no such model %s in ollama list, std: %s", name, stdout)
}

func runOllamaPullWithProcess(taskItem *task.Task, containerName, modelName string) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
defer cancel()

cmdItem := exec.CommandContext(ctx, "docker", "exec", containerName, "ollama", "pull", modelName)
cmdItem.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
writer := &ollamaPullLogWriter{taskItem: taskItem}
cmdItem.Stdout = writer
cmdItem.Stderr = writer

if err := cmdItem.Start(); err != nil {
return fmt.Errorf("failed to start ollama pull %s: %w", modelName, err)
}
waitErr := cmdItem.Wait()
writer.Flush()
if ctx.Err() == context.DeadlineExceeded {
if cmdItem.Process != nil && cmdItem.Process.Pid > 0 {
_ = syscall.Kill(-cmdItem.Process.Pid, syscall.SIGKILL)
}
return buserr.New("ErrCmdTimeout")
}
if waitErr == nil {
return nil
}
if len(strings.TrimSpace(writer.errBuf.String())) > 0 {
return fmt.Errorf("%s", strings.TrimSpace(writer.errBuf.String()))
}
return waitErr
}

type ollamaPullLogWriter struct {
taskItem *task.Task
errBuf bytes.Buffer
}

func (w *ollamaPullLogWriter) Write(p []byte) (n int, err error) {
w.errBuf.Write(p)
for _, segment := range splitOllamaPullSegments(string(p)) {
w.logLine(segment)
}
return len(p), nil
}

func (w *ollamaPullLogWriter) Flush() {}

func (w *ollamaPullLogWriter) logLine(line string) {
if w == nil || w.taskItem == nil {
return
}
line = sanitizeOllamaPullLine(line)
if line == "" {
return
}
if prefix := resolveOllamaPullLogPrefix(line); prefix != "" {
_ = replaceOllamaPullLogLine(prefix, line, w.taskItem)
return
}
w.taskItem.Log(line)
}

func sanitizeOllamaPullLine(line string) string {
line = re.StripAnsiControlSeq(line)
line = strings.TrimSpace(line)
if strings.HasPrefix(line, "pulling manifes") {
return ""
}
return line
}

func splitOllamaPullSegments(chunk string) []string {
if chunk == "" {
return nil
}
chunk = strings.ReplaceAll(chunk, "\r", "\n")
parts := strings.Split(chunk, "\x1b[1G")
segments := make([]string, 0, len(parts))
for _, part := range parts {
for _, line := range strings.Split(part, "\n") {
line = strings.TrimSpace(line)
if line == "" {
continue
}
segments = append(segments, line)
}
}
return segments
}

func resolveOllamaPullLogPrefix(line string) string {
if strings.HasPrefix(line, "pulling ") {
if idx := strings.Index(line, ":"); idx > 0 {
return strings.TrimSpace(line[:idx])
}
fields := strings.Fields(line)
if len(fields) >= 2 {
return strings.TrimSpace(fields[0] + " " + fields[1])
}
}
return ""
}

func replaceOllamaPullLogLine(prefix, newLine string, taskItem *task.Task) error {
if taskItem == nil || taskItem.Task == nil {
return nil
}
data, err := os.ReadFile(taskItem.Task.LogFile)
if err != nil {
return err
}
lines := strings.Split(string(data), "\n")
for idx, line := range lines {
trimmed := strings.TrimSpace(line)
if trimmed == "" {
continue
}
if strings.Contains(trimmed, prefix) {
lines[idx] = time.Now().Format("2006/01/02 15:04:05") + " " + newLine
return os.WriteFile(taskItem.Task.LogFile, []byte(strings.Join(lines, "\n")), os.ModePerm)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Stop rewriting the task log file in place

replaceOllamaPullLogLine rewrites taskItem.Task.LogFile with os.WriteFile, but the task logger keeps an already-open descriptor to the same file (from task.NewTask), opened without O_APPEND. After a truncate-and-rewrite, subsequent taskItem.Log(...) calls continue writing at the old offset, which can introduce NUL gaps and corrupt later log lines. This is reproducible whenever a pulling ... progress line is replaced and then any later log entry is emitted (for example subtask success/end markers).

Useful? React with 👍 / 👎.

}
}
taskItem.Log(newLine)
return nil
}
6 changes: 6 additions & 0 deletions agent/utils/re/re.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
DurationWithOptionalUnitPattern = `^(\d+)([smhdw]?)$`
MysqlGroupPattern = `\[*\]`
AnsiEscapePattern = "\x1b\\[[0-9;?]*[A-Za-z]|\x1b=|\x1b>"
AnsiControlSeqPattern = `\x1b\[[0-9;?]*[ -/]*[@-~]`
RecycleBinFilePattern = `_1p_file_1p_(.+)_p_(\d+)_(\d+)`
OrderByValidationPattern = `^[a-zA-Z_][a-zA-Z0-9_]*$`
SQLIdentifierPattern = `^[A-Za-z_][A-Za-z0-9_]*$`
Expand Down Expand Up @@ -59,6 +60,7 @@ func Init() {
DurationWithOptionalUnitPattern,
MysqlGroupPattern,
AnsiEscapePattern,
AnsiControlSeqPattern,
RecycleBinFilePattern,
OrderByValidationPattern,
SQLIdentifierPattern,
Expand All @@ -82,3 +84,7 @@ func GetRegex(pattern string) *regexp.Regexp {
func RegisterRegex(pattern string) {
regexMap[pattern] = regexp.MustCompile(pattern)
}

func StripAnsiControlSeq(value string) string {
return GetRegex(AnsiControlSeqPattern).ReplaceAllString(value, "")
}
Loading