diff --git a/Dockerfile b/Dockerfile index e54fb1e2..630b2b73 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,13 +2,14 @@ FROM golang:1.25-trixie AS builder ARG IMG_PATH=/opt/pics ARG EXHAUST_PATH=/opt/exhaust + RUN apt update && apt install --no-install-recommends libvips-dev -y && mkdir /build COPY go.mod /build RUN cd /build && go mod download COPY . /build RUN cd /build && sed -i "s|.\/pics|${IMG_PATH}|g" config.json \ - && sed -i "s|\"\"|\"${EXHAUST_PATH}\"|g" config.json \ + && sed -i "s|\"./exhaust\"|\"${EXHAUST_PATH}\"|g" config.json \ && sed -i 's/127.0.0.1/0.0.0.0/g' config.json \ && go build -ldflags="-s -w" -o webp-server . diff --git a/config.json b/config.json index bb3e051b..53934990 100644 --- a/config.json +++ b/config.json @@ -12,5 +12,8 @@ "READ_BUFFER_SIZE": 4096, "CONCURRENCY": 262144, "DISABLE_KEEPALIVE": false, - "CACHE_TTL": 259200 + "CACHE_TTL": 259200, + "MAX_CACHE_SIZE": 0, + "MAX_CONCURRENT_CONVERSIONS": 6, + "MEMORY_LIMIT_MB": 150 } \ No newline at end of file diff --git a/config/config.go b/config/config.go index bf2589fd..5d82d708 100644 --- a/config/config.go +++ b/config/config.go @@ -38,7 +38,9 @@ const ( "CONCURRENCY": 262144, "DISABLE_KEEPALIVE": false, "CACHE_TTL": 259200, - "MAX_CACHE_SIZE": 0 + "MAX_CACHE_SIZE": 0, + "MAX_CONCURRENT_CONVERSIONS": 8, + "MEMORY_LIMIT_MB": 200 }` ) @@ -106,7 +108,9 @@ type WebpConfig struct { DisableKeepalive bool `json:"DISABLE_KEEPALIVE"` CacheTTL int `json:"CACHE_TTL"` // In minutes - MaxCacheSize int `json:"MAX_CACHE_SIZE"` // In MB, for max cached exhausted/metadata files(plus remote-raw if applicable), 0 means no limit + MaxCacheSize int `json:"MAX_CACHE_SIZE"` // In MB, for max cached exhausted/metadata files(plus remote-raw if applicable), 0 means no limit + MaxConcurrentConversions int `json:"MAX_CONCURRENT_CONVERSIONS"` // Maximum number of concurrent image conversions + MemoryLimitMB int `json:"MEMORY_LIMIT_MB"` // Memory limit for conversions in MB } func NewWebPConfig() *WebpConfig { @@ -137,7 +141,9 @@ func NewWebPConfig() *WebpConfig { DisableKeepalive: false, CacheTTL: 259200, - MaxCacheSize: 0, + MaxCacheSize: 0, + MaxConcurrentConversions: 8, + MemoryLimitMB: 200, } } @@ -299,6 +305,24 @@ func LoadConfig() { } } + if os.Getenv("WEBP_MAX_CONCURRENT_CONVERSIONS") != "" { + maxConcurrent, err := strconv.Atoi(os.Getenv("WEBP_MAX_CONCURRENT_CONVERSIONS")) + if err != nil { + log.Warnf("WEBP_MAX_CONCURRENT_CONVERSIONS is not a valid integer, using value in config.json %d", Config.MaxConcurrentConversions) + } else { + Config.MaxConcurrentConversions = maxConcurrent + } + } + + if os.Getenv("WEBP_MEMORY_LIMIT_MB") != "" { + memLimit, err := strconv.Atoi(os.Getenv("WEBP_MEMORY_LIMIT_MB")) + if err != nil { + log.Warnf("WEBP_MEMORY_LIMIT_MB is not a valid integer, using value in config.json %d", Config.MemoryLimitMB) + } else { + Config.MemoryLimitMB = memLimit + } + } + if Config.AllowedTypes[0] == "*" { AllowAllExtensions = true } diff --git a/encoder/encoder.go b/encoder/encoder.go index 5b56e5ac..3552ceb4 100644 --- a/encoder/encoder.go +++ b/encoder/encoder.go @@ -42,67 +42,19 @@ func loadImage(filename string) (*vips.ImageRef, error) { } func ConvertFilter(rawPath, jxlPath, avifPath, webpPath string, extraParams config.ExtraParams, supportedFormats map[string]bool, c chan int) { - // Wait for the conversion to complete and return the converted image - retryDelay := 100 * time.Millisecond // Initial retry delay - - for { - if _, found := config.ConvertLock.Get(rawPath); found { - log.Debugf("file %s is locked under conversion, retrying in %s", rawPath, retryDelay) - time.Sleep(retryDelay) - } else { - // The lock is released, indicating that the conversion is complete - break - } - } - - // If there is a lock here, it means that another thread is converting the same image - // Lock rawPath to prevent concurrent conversion - config.ConvertLock.Set(rawPath, true, -1) - defer config.ConvertLock.Delete(rawPath) - - var wg sync.WaitGroup - wg.Add(3) - if !helper.ImageExists(avifPath) && config.Config.EnableAVIF && supportedFormats["avif"] { - go func() { - err := convertImage(rawPath, avifPath, "avif", extraParams) - if err != nil { - log.Errorln(err) - } - defer wg.Done() - }() - } else { - wg.Done() - } - - if !helper.ImageExists(webpPath) && config.Config.EnableWebP && supportedFormats["webp"] { - go func() { - err := convertImage(rawPath, webpPath, "webp", extraParams) - if err != nil { - log.Errorln(err) - } - defer wg.Done() - }() - } else { - wg.Done() - } - - if !helper.ImageExists(jxlPath) && config.Config.EnableJXL && supportedFormats["jxl"] { - go func() { - err := convertImage(rawPath, jxlPath, "jxl", extraParams) - if err != nil { - log.Errorln(err) - } - defer wg.Done() - }() - } else { - wg.Done() - } - - wg.Wait() - - if c != nil { - c <- 1 + // 使用内存管理器来控制并发 + memManager := GetMemoryManager() + job := &ConversionJob{ + RawPath: rawPath, + JxlPath: jxlPath, + AvifPath: avifPath, + WebpPath: webpPath, + ExtraParams: extraParams, + SupportedFormats: supportedFormats, + Chan: c, } + + memManager.SubmitJob(job) } func convertImage(rawPath, optimizedPath, imageType string, extraParams config.ExtraParams) error { @@ -131,7 +83,14 @@ func convertImage(rawPath, optimizedPath, imageType string, extraParams config.E // Image is only opened here img, err := loadImage(rawPath) - defer img.Close() + if img != nil { + defer img.Close() + } + + if err != nil || img == nil { + log.Errorf("Failed to load image %s: %v", rawPath, err) + return err + } // Pre-process image(auto rotate, resize, etc.) err = preProcessImage(img, imageType, extraParams) @@ -310,3 +269,68 @@ func convertLog(itype, rawPath string, optimizedPath string, quality int) { log.Infof("%s@%d%%: %s->%s %d->%d %.2f%% deflated", itype, quality, rawPath, optimizedPath, oldf.Size(), newf.Size(), float32(newf.Size())/float32(oldf.Size())*100) } + +// convertSync 同步转换函数,用于MemoryManager +func convertSync(rawPath, jxlPath, avifPath, webpPath string, extraParams config.ExtraParams, supportedFormats map[string]bool, c chan int) { + // Wait for the conversion to complete and return the converted image + retryDelay := 100 * time.Millisecond // Initial retry delay + + for { + if _, found := config.ConvertLock.Get(rawPath); found { + log.Debugf("file %s is locked under conversion, retrying in %s", rawPath, retryDelay) + time.Sleep(retryDelay) + } else { + // The lock is released, indicating that the conversion is complete + break + } + } + + // If there is a lock here, it means that another thread is converting the same image + // Lock rawPath to prevent concurrent conversion + config.ConvertLock.Set(rawPath, true, -1) + defer config.ConvertLock.Delete(rawPath) + + var wg sync.WaitGroup + wg.Add(3) + if !helper.ImageExists(avifPath) && config.Config.EnableAVIF && supportedFormats["avif"] { + go func() { + err := convertImage(rawPath, avifPath, "avif", extraParams) + if err != nil { + log.Errorln(err) + } + defer wg.Done() + }() + } else { + wg.Done() + } + + if !helper.ImageExists(webpPath) && config.Config.EnableWebP && supportedFormats["webp"] { + go func() { + err := convertImage(rawPath, webpPath, "webp", extraParams) + if err != nil { + log.Errorln(err) + } + defer wg.Done() + }() + } else { + wg.Done() + } + + if !helper.ImageExists(jxlPath) && config.Config.EnableJXL && supportedFormats["jxl"] { + go func() { + err := convertImage(rawPath, jxlPath, "jxl", extraParams) + if err != nil { + log.Errorln(err) + } + defer wg.Done() + }() + } else { + wg.Done() + } + + wg.Wait() + + if c != nil { + c <- 1 + } +} diff --git a/encoder/memory_manager.go b/encoder/memory_manager.go new file mode 100644 index 00000000..02706a4f --- /dev/null +++ b/encoder/memory_manager.go @@ -0,0 +1,166 @@ +package encoder + +import ( + "runtime" + "sync" + "time" + "webp_server_go/config" + + log "github.com/sirupsen/logrus" +) + +// MemoryManager 管理内存和并发限制 +type MemoryManager struct { + maxConcurrency int + currentJobs int + jobQueue chan *ConversionJob + mu sync.RWMutex + semaphore chan struct{} + memoryLimitMB int64 + currentMemory int64 + memoryMu sync.RWMutex +} + +// ConversionJob 转换任务 +type ConversionJob struct { + RawPath string + JxlPath string + AvifPath string + WebpPath string + ExtraParams config.ExtraParams + SupportedFormats map[string]bool + Chan chan int +} + +var memManager *MemoryManager +var once sync.Once + +// GetMemoryManager 获取内存管理器单例 +func GetMemoryManager() *MemoryManager { + once.Do(func() { + // 使用配置中的参数 + maxConc := config.Config.MaxConcurrentConversions + memLimit := int64(config.Config.MemoryLimitMB) + + // 确保参数在合理范围内 + if maxConc <= 0 { + maxConc = runtime.NumCPU() * 2 // 默认每个CPU核心2个并发 + } + if maxConc > 16 { + maxConc = 16 // 最大16个并发,防止内存爆炸 + } + if maxConc < 2 { + maxConc = 2 // 最少2个并发 + } + + if memLimit <= 0 { + memLimit = 200 // 默认200MB + } + + memManager = &MemoryManager{ + maxConcurrency: maxConc, + jobQueue: make(chan *ConversionJob, maxConc*2), // 队列大小是并发数的2倍 + semaphore: make(chan struct{}, maxConc), + memoryLimitMB: memLimit, + } + + // 启动工作池 + for i := 0; i < maxConc; i++ { + go memManager.worker() + } + + log.Infof("MemoryManager initialized: max_concurrency=%d, memory_limit=%dMB", maxConc, memLimit) + }) + return memManager +} + +// worker 工作协程 +func (m *MemoryManager) worker() { + for job := range m.jobQueue { + // 获取信号量 + m.semaphore <- struct{}{} + + // 检查内存限制 + if !m.checkMemoryLimit() { + log.Warn("Memory limit reached, waiting...") + // 等待内存释放 + for !m.checkMemoryLimit() { + time.Sleep(100 * time.Millisecond) + runtime.GC() // 强制垃圾回收 + } + } + + // 记录开始 + m.mu.Lock() + m.currentJobs++ + m.mu.Unlock() + + // 估算内存使用并记录 + estimatedMemory := m.estimateMemoryUsage(job.RawPath) + m.memoryMu.Lock() + m.currentMemory += estimatedMemory + m.memoryMu.Unlock() + + // 执行转换 + m.processJob(job) + + // 释放信号量和内存计数 + <-m.semaphore + + m.mu.Lock() + m.currentJobs-- + m.mu.Unlock() + + m.memoryMu.Lock() + m.currentMemory -= estimatedMemory + m.memoryMu.Unlock() + + log.Debugf("Job completed. Current jobs: %d, Memory usage: %dMB", + m.currentJobs, m.currentMemory) + } +} + +// checkMemoryLimit 检查内存使用是否超限 +func (m *MemoryManager) checkMemoryLimit() bool { + m.memoryMu.RLock() + defer m.memoryMu.RUnlock() + return m.currentMemory < m.memoryLimitMB +} + +// estimateMemoryUsage 估算转换任务的内存使用量(MB) +func (m *MemoryManager) estimateMemoryUsage(filepath string) int64 { + // 基于文件大小的简单估算 + // 假设转换过程中内存使用是文件大小的5-10倍 + // 这里使用保守估计:20MB + return 20 +} + +// processJob 处理单个转换任务 +func (m *MemoryManager) processJob(job *ConversionJob) { + // 执行实际的转换逻辑 + convertSync(job.RawPath, job.JxlPath, job.AvifPath, job.WebpPath, + job.ExtraParams, job.SupportedFormats, job.Chan) +} + +// SubmitJob 提交转换任务 +func (m *MemoryManager) SubmitJob(job *ConversionJob) { + select { + case m.jobQueue <- job: + log.Debugf("Job submitted to queue: %s", job.RawPath) + default: + // 队列满,直接返回错误 + log.Warn("Job queue is full, rejecting job") + if job.Chan != nil { + close(job.Chan) + } + } +} + +// GetStats 获取统计信息 +func (m *MemoryManager) GetStats() (int, int64, int) { + m.mu.RLock() + m.memoryMu.RLock() + defer m.mu.RUnlock() + defer m.memoryMu.RUnlock() + return m.currentJobs, m.currentMemory, cap(m.jobQueue) - len(m.jobQueue) +} \ No newline at end of file diff --git a/encoder/prefetch.go b/encoder/prefetch.go index b222b947..35909bfc 100644 --- a/encoder/prefetch.go +++ b/encoder/prefetch.go @@ -5,6 +5,7 @@ import ( "os" "path" "path/filepath" + "strings" "time" "webp_server_go/config" "webp_server_go/helper" @@ -14,14 +15,10 @@ import ( ) func PrefetchImages() { - // maximum ongoing prefetch is depending on your core of CPU + // maximum ongoing prefetch is depending on your core of CPU and config var sTime = time.Now() - log.Infof("Prefetching using %d cores", config.Jobs) - var finishChan = make(chan int, config.Jobs) - for range config.Jobs { - finishChan <- 1 - } - + memManager := GetMemoryManager() + //prefetch, recursive through the dir all := helper.FileCount(config.Config.ImgPath) var bar = progressbar.Default(all, "Prefetching...") @@ -33,6 +30,13 @@ func PrefetchImages() { if info.IsDir() { return nil } + // Skip SVG files as they don't need WebP conversion and often cause dimension errors + if strings.ToLower(filepath.Ext(picAbsPath)) == ".svg" { + log.Infof("Skipping SVG file: %s", picAbsPath) + _ = bar.Add(1) + return nil + } + // Only convert files with image extensions, use smaller of config.DefaultAllowedTypes and config.Config.AllowedTypes if helper.CheckAllowedExtension(picAbsPath) { // File type is allowed by user, check if it is an image @@ -62,8 +66,37 @@ func PrefetchImages() { "jxl": true, } - go ConvertFilter(picAbsPath, jxlAbsPath, avifAbsPath, webpAbsPath, config.ExtraParams{Width: 0, Height: 0}, supported, finishChan) - _ = bar.Add(<-finishChan) + // 使用内存管理器进行预转换 + finishChan := make(chan int, 1) + job := &ConversionJob{ + RawPath: picAbsPath, + JxlPath: jxlAbsPath, + AvifPath: avifAbsPath, + WebpPath: webpAbsPath, + ExtraParams: config.ExtraParams{Width: 0, Height: 0}, + SupportedFormats: supported, + Chan: finishChan, + } + + // Add error recovery mechanism + defer func() { + if r := recover(); r != nil { + log.Errorf("Recovered from panic while processing %s: %v", picAbsPath, r) + _ = bar.Add(1) + } + }() + + memManager.SubmitJob(job) + + // Add timeout to prevent hanging + select { + case <-finishChan: + _ = bar.Add(1) + case <-time.After(30 * time.Second): + log.Warnf("Timeout processing %s after 30 seconds, skipping...", picAbsPath) + _ = bar.Add(1) + } + return nil }) @@ -72,5 +105,4 @@ func PrefetchImages() { } elapsed := time.Since(sTime) _, _ = fmt.Fprintf(os.Stdout, "Prefetch complete in %s\n\n", elapsed) - } diff --git a/go.mod b/go.mod index d2456dd9..b7c9db7b 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/buckket/go-blurhash v1.1.0 github.com/cespare/xxhash v1.1.0 github.com/davidbyttow/govips/v2 v2.16.0 - github.com/gofiber/fiber/v2 v2.52.9 + github.com/gofiber/fiber/v2 v2.52.10 github.com/h2non/filetype v1.1.4-0.20230123234534-cfcd7d097bc4 github.com/h2non/go-is-svg v0.0.0-20160927212452-35e8c4b0612c github.com/jeremytorres/rawparser v1.0.2 diff --git a/go.sum b/go.sum index cd522a04..3c5ce022 100644 --- a/go.sum +++ b/go.sum @@ -13,8 +13,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davidbyttow/govips/v2 v2.16.0 h1:1nH/Rbx8qZP1hd+oYL9fYQjAnm1+KorX9s07ZGseQmo= github.com/davidbyttow/govips/v2 v2.16.0/go.mod h1:clH5/IDVmG5eVyc23qYpyi7kmOT0B/1QNTKtci4RkyM= -github.com/gofiber/fiber/v2 v2.52.9 h1:YjKl5DOiyP3j0mO61u3NTmK7or8GzzWzCFzkboyP5cw= -github.com/gofiber/fiber/v2 v2.52.9/go.mod h1:YEcBbO/FB+5M1IZNBP9FO3J9281zgPAreiI1oqg8nDw= +github.com/gofiber/fiber/v2 v2.52.10 h1:jRHROi2BuNti6NYXmZ6gbNSfT3zj/8c0xy94GOU5elY= +github.com/gofiber/fiber/v2 v2.52.10/go.mod h1:YEcBbO/FB+5M1IZNBP9FO3J9281zgPAreiI1oqg8nDw= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= diff --git a/handler/router.go b/handler/router.go index 50a81c09..75ff1f05 100644 --- a/handler/router.go +++ b/handler/router.go @@ -1,6 +1,7 @@ package handler import ( + "fmt" "net/http" "net/url" "regexp" @@ -219,5 +220,14 @@ func Convert(c *fiber.Ctx) error { c.Set("Content-Type", contentType) c.Set("X-Compression-Rate", helper.GetCompressionRate(rawImageAbs, finalFilename)) + + // 添加内存管理状态到响应头 + if memManager := encoder.GetMemoryManager(); memManager != nil { + currentJobs, currentMemory, queueSize := memManager.GetStats() + c.Set("X-Memory-Jobs", fmt.Sprintf("%d", currentJobs)) + c.Set("X-Memory-Usage-MB", fmt.Sprintf("%d", currentMemory)) + c.Set("X-Queue-Size", fmt.Sprintf("%d", queueSize)) + } + return c.SendFile(finalFilename) } diff --git a/helper/metadata.go b/helper/metadata.go index 949e3ae0..1f209d27 100644 --- a/helper/metadata.go +++ b/helper/metadata.go @@ -5,6 +5,7 @@ import ( "net/url" "os" "path" + "strings" "webp_server_go/config" "github.com/buckket/go-blurhash" @@ -19,11 +20,37 @@ func getId(p string, subdir string) (id string, filePath string, santizedPath st fileID := HashString(p) return fileID, path.Join(config.Config.RemoteRawPath, subdir, fileID) + path.Ext(p), "" } + + // Parse URL first to separate path and query parsed, _ := url.Parse(p) width := parsed.Query().Get("width") height := parsed.Query().Get("height") max_width := parsed.Query().Get("max_width") max_height := parsed.Query().Get("max_height") + + // Check if the path part is a true absolute filesystem path (not just starting with /) + // True absolute paths are like /opt/pics/image.jpg (used in prefetch) + // Web request paths like /image.jpg should be treated as relative paths + if path.IsAbs(parsed.Path) && strings.HasPrefix(parsed.Path, config.Config.ImgPath) { + // For absolute paths (like in prefetch), use the path directly + // Remove the ImgPath prefix to get relative path + relativePath := strings.TrimPrefix(parsed.Path, config.Config.ImgPath) + if strings.HasPrefix(relativePath, "/") { + relativePath = relativePath[1:] // Remove leading slash + } + if relativePath == "" { + relativePath = parsed.Path // Fallback to original path if trimming removes everything + } + + parsedPath := "/" + relativePath + santizedPath = parsedPath + "?width=" + width + "&height=" + height + "&max_width=" + max_width + "&max_height=" + max_height + id = HashString(santizedPath) + filePath = parsed.Path // Use the absolute path directly + + return id, filePath, santizedPath + } + + // For relative paths (like in web requests) // santizedPath will be /webp_server.jpg?width=200\u0026height=\u0026max_width=\u0026max_height= in local mode when requesting /webp_server.jpg?width=200 // santizedPath will be https://docs.webp.sh/images/webp_server.jpg?width=400 in proxy mode when requesting /images/webp_server.jpg?width=400 with IMG_PATH = https://docs.webp.sh santizedPath = parsed.Path + "?width=" + width + "&height=" + height + "&max_width=" + max_width + "&max_height=" + max_height