11package local_runtime
22
33import (
4+ "bufio"
45 "bytes"
56 "context"
67 "errors"
78 "fmt"
9+ "net/url"
810 "os"
911 "os/exec"
1012 "path"
1113 "path/filepath"
14+ "regexp"
15+ "sort"
1216 "strconv"
1317 "strings"
1418 "sync"
19+ "sync/atomic"
1520 "time"
1621
1722 routinepkg "github.com/langgenius/dify-plugin-daemon/pkg/routine"
@@ -37,8 +42,15 @@ func (p *LocalPluginRuntime) prepareUV() (string, error) {
3742func (p * LocalPluginRuntime ) preparePipArgs () []string {
3843 args := []string {"install" }
3944
40- if p .appConfig .PipMirrorUrl != "" {
41- args = append (args , "-i" , p .appConfig .PipMirrorUrl )
45+ // Determine index URL precedence for pip install:
46+ indexURL := p .appConfig .PipMirrorUrl
47+ // Extra index URLs (comma or space separated); fallback to UV extras
48+ extra := p .appConfig .PipExtraIndexUrl
49+ args = addIndexArgs (args , indexURL , extra )
50+
51+ // Derive trusted-host from index/extra URLs
52+ for _ , h := range deriveTrustedHosts (indexURL , extra ) {
53+ args = append (args , "--trusted-host" , h )
4254 }
4355
4456 args = append (args , "-r" , "requirements.txt" )
@@ -60,9 +72,11 @@ func (p *LocalPluginRuntime) preparePipArgs() []string {
6072func (p * LocalPluginRuntime ) prepareSyncArgs () []string {
6173 args := []string {"sync" , "--no-dev" }
6274
63- if p .appConfig .PipMirrorUrl != "" {
64- args = append (args , "-i" , p .appConfig .PipMirrorUrl )
65- }
75+ // Determine index URL precedence for uv sync:
76+ indexURL := p .appConfig .PipMirrorUrl
77+ // Extra index URLs; fallback to pip extras
78+ extra := p .appConfig .PipExtraIndexUrl
79+ args = addIndexArgs (args , indexURL , extra )
6680
6781 if p .appConfig .PipVerbose {
6882 args = append (args , "-v" )
@@ -113,6 +127,15 @@ func (p *LocalPluginRuntime) installDependencies(
113127 virtualEnvPath := path .Join (p .State .WorkingPath , ".venv" )
114128 cmd := exec .CommandContext (ctx , uvPath , args ... )
115129 cmd .Env = append (cmd .Env , "VIRTUAL_ENV=" + virtualEnvPath , "PATH=" + os .Getenv ("PATH" ))
130+
131+ // Also provide PIP_TRUSTED_HOST env (space-separated) for pip under uv
132+ pipIndex := p .appConfig .PipMirrorUrl
133+ pipExtra := p .appConfig .PipExtraIndexUrl
134+
135+ if hosts := deriveTrustedHosts (pipIndex , pipExtra ); len (hosts ) > 0 {
136+ cmd .Env = append (cmd .Env , fmt .Sprintf ("PIP_TRUSTED_HOST=%s" , strings .Join (hosts , " " )))
137+ }
138+
116139 if p .appConfig .HttpProxy != "" {
117140 cmd .Env = append (cmd .Env , fmt .Sprintf ("HTTP_PROXY=%s" , p .appConfig .HttpProxy ))
118141 }
@@ -124,6 +147,20 @@ func (p *LocalPluginRuntime) installDependencies(
124147 }
125148 cmd .Dir = p .State .WorkingPath
126149
150+ // log start with sanitized args
151+ startAt := time .Now ()
152+ sanitized := sanitizeArgs (args )
153+ log .Info ("starting dependency installation" ,
154+ "plugin" , p .Config .Identity (),
155+ "method" , func () string {
156+ if dependencyFileType == pyprojectTomlFile {
157+ return "uv sync"
158+ } else {
159+ return "uv pip install"
160+ }
161+ }(),
162+ "args" , strings .Join (sanitized , " " ))
163+
127164 // get stdout and stderr
128165 stdout , err := cmd .StdoutPipe ()
129166 if err != nil {
@@ -149,26 +186,34 @@ func (p *LocalPluginRuntime) installDependencies(
149186 }()
150187
151188 var errMsg strings.Builder
189+ var errMu sync.Mutex
152190 var wg sync.WaitGroup
153191 wg .Add (2 )
154192
155- lastActiveAt := time .Now ()
193+ var lastActiveAt atomic.Int64
194+ lastActiveAt .Store (time .Now ().UnixNano ())
156195
157196 routine .Submit (routinepkg.Labels {
158197 routinepkg .RoutineLabelKeyModule : "plugin_manager" ,
159198 routinepkg .RoutineLabelKeyMethod : "InitPythonEnvironment" ,
160199 }, func () {
161200 defer wg .Done ()
162- // read stdout
163- buf := make ([]byte , 1024 )
164- for {
165- n , err := stdout .Read (buf )
166- if err != nil {
167- break
168- }
169- // FIXME: move the log to separated layer
170- log .Info ("installing plugin" , "plugin" , p .Config .Identity (), "output" , string (buf [:n ]))
171- lastActiveAt = time .Now ()
201+ // read stdout line by line
202+ scanner := bufio .NewScanner (stdout )
203+ buf := make ([]byte , 0 , 64 * 1024 )
204+ scanner .Buffer (buf , 10 * 1024 * 1024 )
205+ for scanner .Scan () {
206+ line := scanner .Text ()
207+ log .Info ("install deps" , "plugin" , p .Config .Identity (), "stream" , "stdout" , "line" , line )
208+ lastActiveAt .Store (time .Now ().UnixNano ())
209+ }
210+ if err := scanner .Err (); err != nil {
211+ errMu .Lock ()
212+ errMsg .WriteString ("stdout scan error: " )
213+ errMsg .WriteString (err .Error ())
214+ errMsg .WriteString ("\n " )
215+ errMu .Unlock ()
216+ log .Warn ("install deps" , "plugin" , p .Config .Identity (), "stream" , "stdout" , "scanner_err" , err .Error ())
172217 }
173218 })
174219
@@ -177,22 +222,26 @@ func (p *LocalPluginRuntime) installDependencies(
177222 routinepkg .RoutineLabelKeyMethod : "InitPythonEnvironment" ,
178223 }, func () {
179224 defer wg .Done ()
180- // read stderr
181- buf := make ([]byte , 1024 )
182- for {
183- n , err := stderr .Read (buf )
184- if err != nil && err != os .ErrClosed {
185- lastActiveAt = time .Now ()
186- errMsg .WriteString (string (buf [:n ]))
187- break
188- } else if err == os .ErrClosed {
189- break
190- }
191-
192- if n > 0 {
193- errMsg .WriteString (string (buf [:n ]))
194- lastActiveAt = time .Now ()
195- }
225+ // read stderr line by line
226+ scanner := bufio .NewScanner (stderr )
227+ buf := make ([]byte , 0 , 64 * 1024 )
228+ scanner .Buffer (buf , 10 * 1024 * 1024 )
229+ for scanner .Scan () {
230+ line := scanner .Text ()
231+ errMu .Lock ()
232+ errMsg .WriteString (line )
233+ errMsg .WriteString ("\n " )
234+ errMu .Unlock ()
235+ log .Warn ("install deps" , "plugin" , p .Config .Identity (), "stream" , "stderr" , "line" , line )
236+ lastActiveAt .Store (time .Now ().UnixNano ())
237+ }
238+ if err := scanner .Err (); err != nil {
239+ errMu .Lock ()
240+ errMsg .WriteString ("stderr scan error: " )
241+ errMsg .WriteString (err .Error ())
242+ errMsg .WriteString ("\n " )
243+ errMu .Unlock ()
244+ log .Warn ("install deps" , "plugin" , p .Config .Identity (), "stream" , "stderr" , "scanner_err" , err .Error ())
196245 }
197246 })
198247
@@ -207,14 +256,16 @@ func (p *LocalPluginRuntime) installDependencies(
207256 break
208257 }
209258
210- if time .Since (lastActiveAt ) > time .Duration (
259+ if time .Since (time . Unix ( 0 , lastActiveAt . Load ()) ) > time .Duration (
211260 p .appConfig .PythonEnvInitTimeout ,
212261 )* time .Second {
213262 cmd .Process .Kill ()
263+ errMu .Lock ()
214264 errMsg .WriteString (fmt .Sprintf (
215265 "init process exited due to no activity for %d seconds" ,
216266 p .appConfig .PythonEnvInitTimeout ,
217267 ))
268+ errMu .Unlock ()
218269 break
219270 }
220271 }
@@ -223,12 +274,29 @@ func (p *LocalPluginRuntime) installDependencies(
223274 wg .Wait ()
224275
225276 if err := cmd .Wait (); err != nil {
277+ log .Error ("dependency installation failed" , "plugin" , p .Config .Identity (), "duration" , time .Since (startAt ).String (), "error" , err )
226278 return fmt .Errorf ("failed to install dependencies: %s, output: %s" , err , errMsg .String ())
227279 }
228280
281+ log .Info ("dependency installation finished" , "plugin" , p .Config .Identity (), "duration" , time .Since (startAt ).String ())
229282 return nil
230283}
231284
285+ // sanitizeArgs redacts credentials in any URL-like arguments to avoid leaking secrets in logs.
286+ func sanitizeArgs (args []string ) []string {
287+ // Match https://user:pass@ and https://user@
288+ reWithPass := regexp .MustCompile (`(https?://)[^/@:]+:[^/@]+@` )
289+ reUserOnly := regexp .MustCompile (`(https?://)[^/@:]+@` )
290+
291+ out := make ([]string , len (args ))
292+ for i , a := range args {
293+ s := reWithPass .ReplaceAllString (a , "${1}****:****@" )
294+ s = reUserOnly .ReplaceAllString (s , "${1}****:****@" )
295+ out [i ] = s
296+ }
297+ return out
298+ }
299+
232300type PythonVirtualEnvironment struct {
233301 pythonInterpreterPath string
234302}
@@ -356,6 +424,69 @@ func (p *LocalPluginRuntime) markVirtualEnvironmentAsValid() error {
356424 return nil
357425}
358426
427+ // splitByCommaOrSpace splits a list like "a,b c" into tokens.
428+ func splitByCommaOrSpace (s string ) []string {
429+ // replace comma with space then split by spaces
430+ s = strings .ReplaceAll (s , "," , " " )
431+ fields := strings .Fields (s )
432+ return fields
433+ }
434+
435+ // selectURL returns the first non-empty URL from the provided list.
436+ func selectURL (urls ... string ) string {
437+ for _ , u := range urls {
438+ if u != "" {
439+ return u
440+ }
441+ }
442+ return ""
443+ }
444+
445+ // addIndexArgs appends index and extra-index URL arguments to args.
446+ func addIndexArgs (args []string , indexURL string , extraIndexURL string ) []string {
447+ if indexURL != "" {
448+ args = append (args , "-i" , indexURL )
449+ }
450+ if extraIndexURL != "" {
451+ for _ , u := range splitByCommaOrSpace (extraIndexURL ) {
452+ if u != "" {
453+ args = append (args , "--extra-index-url" , u )
454+ }
455+ }
456+ }
457+ return args
458+ }
459+
460+ // deriveTrustedHosts parses hostnames from index/extra URLs and returns a de-duplicated list.
461+ func deriveTrustedHosts (indexURL string , extraIndexURL string ) []string {
462+ set := map [string ]struct {}{}
463+ add := func (raw string ) {
464+ if strings .TrimSpace (raw ) == "" {
465+ return
466+ }
467+ u , err := url .Parse (raw )
468+ if err != nil || u .Host == "" {
469+ return
470+ }
471+ host := u .Host
472+ if i := strings .Index (host , ":" ); i >= 0 {
473+ host = host [:i ]
474+ }
475+ set [host ] = struct {}{}
476+ }
477+ add (indexURL )
478+ for _ , raw := range splitByCommaOrSpace (extraIndexURL ) {
479+ add (raw )
480+ }
481+ out := make ([]string , 0 , len (set ))
482+ for h := range set {
483+ out = append (out , h )
484+ }
485+ // preserve deterministic order: sort hostnames
486+ sort .Strings (out )
487+ return out
488+ }
489+
359490func (p * LocalPluginRuntime ) preCompile (
360491 pythonPath string ,
361492) error {
0 commit comments