11package local_runtime
22
33import (
4+ "bufio"
45 "bytes"
56 "context"
67 "errors"
78 "fmt"
9+ "net/url"
810 "os"
911 "os/exec"
1012 "path"
1113 "path/filepath"
14+ "regexp"
15+ "sort"
1216 "strconv"
1317 "strings"
1418 "sync"
19+ "sync/atomic"
1520 "time"
1621
1722 routinepkg "github.com/langgenius/dify-plugin-daemon/pkg/routine"
@@ -37,8 +42,21 @@ func (p *LocalPluginRuntime) prepareUV() (string, error) {
3742func (p * LocalPluginRuntime ) preparePipArgs () []string {
3843 args := []string {"install" }
3944
40- if p .appConfig .PipMirrorUrl != "" {
41- args = append (args , "-i" , p .appConfig .PipMirrorUrl )
45+ // Determine index URL precedence for pip install:
46+ // 1) PipIndexUrl
47+ // 2) PipMirrorUrl (legacy/custom)
48+ // 3) UvIndexUrl (as fallback if user only set UV var)
49+ indexURL := selectURL (p .appConfig .PipIndexUrl , p .appConfig .PipMirrorUrl , p .appConfig .UvIndexUrl )
50+ // Extra index URLs (comma or space separated); fallback to UV extras
51+ extra := p .appConfig .PipExtraIndexUrl
52+ if extra == "" {
53+ extra = p .appConfig .UvExtraIndexUrl
54+ }
55+ args = addIndexArgs (args , indexURL , extra )
56+
57+ // Derive trusted-host from index/extra URLs
58+ for _ , h := range deriveTrustedHosts (indexURL , extra ) {
59+ args = append (args , "--trusted-host" , h )
4260 }
4361
4462 args = append (args , "-r" , "requirements.txt" )
@@ -60,9 +78,17 @@ func (p *LocalPluginRuntime) preparePipArgs() []string {
6078func (p * LocalPluginRuntime ) prepareSyncArgs () []string {
6179 args := []string {"sync" , "--no-dev" }
6280
63- if p .appConfig .PipMirrorUrl != "" {
64- args = append (args , "-i" , p .appConfig .PipMirrorUrl )
81+ // Determine index URL precedence for uv sync:
82+ // 1) UvIndexUrl
83+ // 2) PipMirrorUrl (legacy/custom)
84+ // 3) PipIndexUrl
85+ indexURL := selectURL (p .appConfig .UvIndexUrl , p .appConfig .PipMirrorUrl , p .appConfig .PipIndexUrl )
86+ // Extra index URLs; fallback to pip extras
87+ extra := p .appConfig .UvExtraIndexUrl
88+ if extra == "" {
89+ extra = p .appConfig .PipExtraIndexUrl
6590 }
91+ args = addIndexArgs (args , indexURL , extra )
6692
6793 if p .appConfig .PipVerbose {
6894 args = append (args , "-v" )
@@ -113,6 +139,17 @@ func (p *LocalPluginRuntime) installDependencies(
113139 virtualEnvPath := path .Join (p .State .WorkingPath , ".venv" )
114140 cmd := exec .CommandContext (ctx , uvPath , args ... )
115141 cmd .Env = append (cmd .Env , "VIRTUAL_ENV=" + virtualEnvPath , "PATH=" + os .Getenv ("PATH" ))
142+
143+ // Also provide PIP_TRUSTED_HOST env (space-separated) for pip under uv
144+ pipIndex := selectURL (p .appConfig .PipIndexUrl , p .appConfig .PipMirrorUrl , p .appConfig .UvIndexUrl )
145+ pipExtra := p .appConfig .PipExtraIndexUrl
146+ if pipExtra == "" {
147+ pipExtra = p .appConfig .UvExtraIndexUrl
148+ }
149+ if hosts := deriveTrustedHosts (pipIndex , pipExtra ); len (hosts ) > 0 {
150+ cmd .Env = append (cmd .Env , fmt .Sprintf ("PIP_TRUSTED_HOST=%s" , strings .Join (hosts , " " )))
151+ }
152+
116153 if p .appConfig .HttpProxy != "" {
117154 cmd .Env = append (cmd .Env , fmt .Sprintf ("HTTP_PROXY=%s" , p .appConfig .HttpProxy ))
118155 }
@@ -124,6 +161,20 @@ func (p *LocalPluginRuntime) installDependencies(
124161 }
125162 cmd .Dir = p .State .WorkingPath
126163
164+ // log start with sanitized args
165+ startAt := time .Now ()
166+ sanitized := sanitizeArgs (args )
167+ log .Info ("starting dependency installation" ,
168+ "plugin" , p .Config .Identity (),
169+ "method" , func () string {
170+ if dependencyFileType == pyprojectTomlFile {
171+ return "uv sync"
172+ } else {
173+ return "uv pip install"
174+ }
175+ }(),
176+ "args" , strings .Join (sanitized , " " ))
177+
127178 // get stdout and stderr
128179 stdout , err := cmd .StdoutPipe ()
129180 if err != nil {
@@ -149,26 +200,34 @@ func (p *LocalPluginRuntime) installDependencies(
149200 }()
150201
151202 var errMsg strings.Builder
203+ var errMu sync.Mutex
152204 var wg sync.WaitGroup
153205 wg .Add (2 )
154206
155- lastActiveAt := time .Now ()
207+ var lastActiveAt atomic.Int64
208+ lastActiveAt .Store (time .Now ().UnixNano ())
156209
157210 routine .Submit (routinepkg.Labels {
158211 routinepkg .RoutineLabelKeyModule : "plugin_manager" ,
159212 routinepkg .RoutineLabelKeyMethod : "InitPythonEnvironment" ,
160213 }, func () {
161214 defer wg .Done ()
162- // read stdout
163- buf := make ([]byte , 1024 )
164- for {
165- n , err := stdout .Read (buf )
166- if err != nil {
167- break
168- }
169- // FIXME: move the log to separated layer
170- log .Info ("installing plugin" , "plugin" , p .Config .Identity (), "output" , string (buf [:n ]))
171- lastActiveAt = time .Now ()
215+ // read stdout line by line
216+ scanner := bufio .NewScanner (stdout )
217+ buf := make ([]byte , 0 , 64 * 1024 )
218+ scanner .Buffer (buf , 10 * 1024 * 1024 )
219+ for scanner .Scan () {
220+ line := scanner .Text ()
221+ log .Info ("install deps" , "plugin" , p .Config .Identity (), "stream" , "stdout" , "line" , line )
222+ lastActiveAt .Store (time .Now ().UnixNano ())
223+ }
224+ if err := scanner .Err (); err != nil {
225+ errMu .Lock ()
226+ errMsg .WriteString ("stdout scan error: " )
227+ errMsg .WriteString (err .Error ())
228+ errMsg .WriteString ("\n " )
229+ errMu .Unlock ()
230+ log .Warn ("install deps" , "plugin" , p .Config .Identity (), "stream" , "stdout" , "scanner_err" , err .Error ())
172231 }
173232 })
174233
@@ -177,22 +236,26 @@ func (p *LocalPluginRuntime) installDependencies(
177236 routinepkg .RoutineLabelKeyMethod : "InitPythonEnvironment" ,
178237 }, func () {
179238 defer wg .Done ()
180- // read stderr
181- buf := make ([]byte , 1024 )
182- for {
183- n , err := stderr .Read (buf )
184- if err != nil && err != os .ErrClosed {
185- lastActiveAt = time .Now ()
186- errMsg .WriteString (string (buf [:n ]))
187- break
188- } else if err == os .ErrClosed {
189- break
190- }
191-
192- if n > 0 {
193- errMsg .WriteString (string (buf [:n ]))
194- lastActiveAt = time .Now ()
195- }
239+ // read stderr line by line
240+ scanner := bufio .NewScanner (stderr )
241+ buf := make ([]byte , 0 , 64 * 1024 )
242+ scanner .Buffer (buf , 10 * 1024 * 1024 )
243+ for scanner .Scan () {
244+ line := scanner .Text ()
245+ errMu .Lock ()
246+ errMsg .WriteString (line )
247+ errMsg .WriteString ("\n " )
248+ errMu .Unlock ()
249+ log .Warn ("install deps" , "plugin" , p .Config .Identity (), "stream" , "stderr" , "line" , line )
250+ lastActiveAt .Store (time .Now ().UnixNano ())
251+ }
252+ if err := scanner .Err (); err != nil {
253+ errMu .Lock ()
254+ errMsg .WriteString ("stderr scan error: " )
255+ errMsg .WriteString (err .Error ())
256+ errMsg .WriteString ("\n " )
257+ errMu .Unlock ()
258+ log .Warn ("install deps" , "plugin" , p .Config .Identity (), "stream" , "stderr" , "scanner_err" , err .Error ())
196259 }
197260 })
198261
@@ -207,14 +270,16 @@ func (p *LocalPluginRuntime) installDependencies(
207270 break
208271 }
209272
210- if time .Since (lastActiveAt ) > time .Duration (
273+ if time .Since (time . Unix ( 0 , lastActiveAt . Load ()) ) > time .Duration (
211274 p .appConfig .PythonEnvInitTimeout ,
212275 )* time .Second {
213276 cmd .Process .Kill ()
277+ errMu .Lock ()
214278 errMsg .WriteString (fmt .Sprintf (
215279 "init process exited due to no activity for %d seconds" ,
216280 p .appConfig .PythonEnvInitTimeout ,
217281 ))
282+ errMu .Unlock ()
218283 break
219284 }
220285 }
@@ -223,12 +288,29 @@ func (p *LocalPluginRuntime) installDependencies(
223288 wg .Wait ()
224289
225290 if err := cmd .Wait (); err != nil {
291+ log .Error ("dependency installation failed" , "plugin" , p .Config .Identity (), "duration" , time .Since (startAt ).String (), "error" , err )
226292 return fmt .Errorf ("failed to install dependencies: %s, output: %s" , err , errMsg .String ())
227293 }
228294
295+ log .Info ("dependency installation finished" , "plugin" , p .Config .Identity (), "duration" , time .Since (startAt ).String ())
229296 return nil
230297}
231298
299+ // sanitizeArgs redacts credentials in any URL-like arguments to avoid leaking secrets in logs.
300+ func sanitizeArgs (args []string ) []string {
301+ // Match https://user:pass@ and https://user@
302+ reWithPass := regexp .MustCompile (`(https?://)[^/@:]+:[^/@]+@` )
303+ reUserOnly := regexp .MustCompile (`(https?://)[^/@:]+@` )
304+
305+ out := make ([]string , len (args ))
306+ for i , a := range args {
307+ s := reWithPass .ReplaceAllString (a , "${1}****:****@" )
308+ s = reUserOnly .ReplaceAllString (s , "${1}****:****@" )
309+ out [i ] = s
310+ }
311+ return out
312+ }
313+
232314type PythonVirtualEnvironment struct {
233315 pythonInterpreterPath string
234316}
@@ -356,6 +438,59 @@ func (p *LocalPluginRuntime) markVirtualEnvironmentAsValid() error {
356438 return nil
357439}
358440
441+ // splitByCommaOrSpace splits a list like "a,b c" into tokens.
442+ func splitByCommaOrSpace (s string ) []string {
443+ // replace comma with space then split by spaces
444+ s = strings .ReplaceAll (s , "," , " " )
445+ fields := strings .Fields (s )
446+ return fields
447+ }
448+
449+ // selectURL returns the first non-empty URL from the provided list.
450+ func selectURL (urls ... string ) string {
451+ for _ , u := range urls {
452+ if u != "" {
453+ return u
454+ }
455+ }
456+ return ""
457+ }
458+
459+ // addIndexArgs appends index and extra-index URL arguments to args.
460+ func addIndexArgs (args []string , indexURL string , extraIndexURL string ) []string {
461+ if indexURL != "" {
462+ args = append (args , "-i" , indexURL )
463+ }
464+ if extraIndexURL != "" {
465+ for _ , u := range splitByCommaOrSpace (extraIndexURL ) {
466+ if u != "" {
467+ args = append (args , "--extra-index-url" , u )
468+ }
469+ }
470+ }
471+ return args
472+ }
473+
474+ // deriveTrustedHosts parses hostnames from index/extra URLs and returns a de-duplicated list.
475+ func deriveTrustedHosts (indexURL string , extraIndexURL string ) []string {
476+ set := map [string ]struct {}{}
477+ add := func (raw string ) {
478+ if strings .TrimSpace (raw ) == "" { return }
479+ u , err := url .Parse (raw )
480+ if err != nil || u .Host == "" { return }
481+ host := u .Host
482+ if i := strings .Index (host , ":" ); i >= 0 { host = host [:i ] }
483+ set [host ] = struct {}{}
484+ }
485+ add (indexURL )
486+ for _ , raw := range splitByCommaOrSpace (extraIndexURL ) { add (raw ) }
487+ out := make ([]string , 0 , len (set ))
488+ for h := range set { out = append (out , h ) }
489+ // preserve deterministic order: sort hostnames
490+ sort .Strings (out )
491+ return out
492+ }
493+
359494func (p * LocalPluginRuntime ) preCompile (
360495 pythonPath string ,
361496) error {
0 commit comments