11package local_runtime
22
33import (
4+ "bufio"
45 "bytes"
56 "context"
67 "errors"
@@ -9,9 +10,11 @@ import (
910 "os/exec"
1011 "path"
1112 "path/filepath"
13+ "regexp"
1214 "strconv"
1315 "strings"
1416 "sync"
17+ "sync/atomic"
1518 "time"
1619
1720 routinepkg "github.com/langgenius/dify-plugin-daemon/pkg/routine"
@@ -37,9 +40,17 @@ func (p *LocalPluginRuntime) prepareUV() (string, error) {
3740func (p * LocalPluginRuntime ) preparePipArgs () []string {
3841 args := []string {"install" }
3942
40- if p .appConfig .PipMirrorUrl != "" {
41- args = append (args , "-i" , p .appConfig .PipMirrorUrl )
43+ // Determine index URL precedence for pip install:
44+ // 1) PipIndexUrl
45+ // 2) PipMirrorUrl (legacy/custom)
46+ // 3) UvIndexUrl (as fallback if user only set UV var)
47+ indexURL := selectURL (p .appConfig .PipIndexUrl , p .appConfig .PipMirrorUrl , p .appConfig .UvIndexUrl )
48+ // Extra index URLs (comma or space separated); fallback to UV extras
49+ extra := p .appConfig .PipExtraIndexUrl
50+ if extra == "" {
51+ extra = p .appConfig .UvExtraIndexUrl
4252 }
53+ args = addIndexArgs (args , indexURL , extra )
4354
4455 args = append (args , "-r" , "requirements.txt" )
4556
@@ -60,9 +71,17 @@ func (p *LocalPluginRuntime) preparePipArgs() []string {
6071func (p * LocalPluginRuntime ) prepareSyncArgs () []string {
6172 args := []string {"sync" , "--no-dev" }
6273
63- if p .appConfig .PipMirrorUrl != "" {
64- args = append (args , "-i" , p .appConfig .PipMirrorUrl )
74+ // Determine index URL precedence for uv sync:
75+ // 1) UvIndexUrl
76+ // 2) PipMirrorUrl (legacy/custom)
77+ // 3) PipIndexUrl
78+ indexURL := selectURL (p .appConfig .UvIndexUrl , p .appConfig .PipMirrorUrl , p .appConfig .PipIndexUrl )
79+ // Extra index URLs; fallback to pip extras
80+ extra := p .appConfig .UvExtraIndexUrl
81+ if extra == "" {
82+ extra = p .appConfig .PipExtraIndexUrl
6583 }
84+ args = addIndexArgs (args , indexURL , extra )
6685
6786 if p .appConfig .PipVerbose {
6887 args = append (args , "-v" )
@@ -124,6 +143,20 @@ func (p *LocalPluginRuntime) installDependencies(
124143 }
125144 cmd .Dir = p .State .WorkingPath
126145
146+ // log start with sanitized args
147+ startAt := time .Now ()
148+ sanitized := sanitizeArgs (args )
149+ log .Info ("starting dependency installation" ,
150+ "plugin" , p .Config .Identity (),
151+ "method" , func () string {
152+ if dependencyFileType == pyprojectTomlFile {
153+ return "uv sync"
154+ } else {
155+ return "uv pip install"
156+ }
157+ }(),
158+ "args" , strings .Join (sanitized , " " ))
159+
127160 // get stdout and stderr
128161 stdout , err := cmd .StdoutPipe ()
129162 if err != nil {
@@ -149,26 +182,34 @@ func (p *LocalPluginRuntime) installDependencies(
149182 }()
150183
151184 var errMsg strings.Builder
185+ var errMu sync.Mutex
152186 var wg sync.WaitGroup
153187 wg .Add (2 )
154188
155- lastActiveAt := time .Now ()
189+ var lastActiveAt atomic.Int64
190+ lastActiveAt .Store (time .Now ().UnixNano ())
156191
157192 routine .Submit (routinepkg.Labels {
158193 routinepkg .RoutineLabelKeyModule : "plugin_manager" ,
159194 routinepkg .RoutineLabelKeyMethod : "InitPythonEnvironment" ,
160195 }, func () {
161196 defer wg .Done ()
162- // read stdout
163- buf := make ([]byte , 1024 )
164- for {
165- n , err := stdout .Read (buf )
166- if err != nil {
167- break
168- }
169- // FIXME: move the log to separated layer
170- log .Info ("installing plugin" , "plugin" , p .Config .Identity (), "output" , string (buf [:n ]))
171- lastActiveAt = time .Now ()
197+ // read stdout line by line
198+ scanner := bufio .NewScanner (stdout )
199+ buf := make ([]byte , 0 , 64 * 1024 )
200+ scanner .Buffer (buf , 10 * 1024 * 1024 )
201+ for scanner .Scan () {
202+ line := scanner .Text ()
203+ log .Info ("install deps" , "plugin" , p .Config .Identity (), "stream" , "stdout" , "line" , line )
204+ lastActiveAt .Store (time .Now ().UnixNano ())
205+ }
206+ if err := scanner .Err (); err != nil {
207+ errMu .Lock ()
208+ errMsg .WriteString ("stdout scan error: " )
209+ errMsg .WriteString (err .Error ())
210+ errMsg .WriteString ("\n " )
211+ errMu .Unlock ()
212+ log .Warn ("install deps" , "plugin" , p .Config .Identity (), "stream" , "stdout" , "scanner_err" , err .Error ())
172213 }
173214 })
174215
@@ -177,22 +218,26 @@ func (p *LocalPluginRuntime) installDependencies(
177218 routinepkg .RoutineLabelKeyMethod : "InitPythonEnvironment" ,
178219 }, func () {
179220 defer wg .Done ()
180- // read stderr
181- buf := make ([]byte , 1024 )
182- for {
183- n , err := stderr .Read (buf )
184- if err != nil && err != os .ErrClosed {
185- lastActiveAt = time .Now ()
186- errMsg .WriteString (string (buf [:n ]))
187- break
188- } else if err == os .ErrClosed {
189- break
190- }
191-
192- if n > 0 {
193- errMsg .WriteString (string (buf [:n ]))
194- lastActiveAt = time .Now ()
195- }
221+ // read stderr line by line
222+ scanner := bufio .NewScanner (stderr )
223+ buf := make ([]byte , 0 , 64 * 1024 )
224+ scanner .Buffer (buf , 10 * 1024 * 1024 )
225+ for scanner .Scan () {
226+ line := scanner .Text ()
227+ errMu .Lock ()
228+ errMsg .WriteString (line )
229+ errMsg .WriteString ("\n " )
230+ errMu .Unlock ()
231+ log .Warn ("install deps" , "plugin" , p .Config .Identity (), "stream" , "stderr" , "line" , line )
232+ lastActiveAt .Store (time .Now ().UnixNano ())
233+ }
234+ if err := scanner .Err (); err != nil {
235+ errMu .Lock ()
236+ errMsg .WriteString ("stderr scan error: " )
237+ errMsg .WriteString (err .Error ())
238+ errMsg .WriteString ("\n " )
239+ errMu .Unlock ()
240+ log .Warn ("install deps" , "plugin" , p .Config .Identity (), "stream" , "stderr" , "scanner_err" , err .Error ())
196241 }
197242 })
198243
@@ -207,14 +252,16 @@ func (p *LocalPluginRuntime) installDependencies(
207252 break
208253 }
209254
210- if time .Since (lastActiveAt ) > time .Duration (
255+ if time .Since (time . Unix ( 0 , lastActiveAt . Load ()) ) > time .Duration (
211256 p .appConfig .PythonEnvInitTimeout ,
212257 )* time .Second {
213258 cmd .Process .Kill ()
259+ errMu .Lock ()
214260 errMsg .WriteString (fmt .Sprintf (
215261 "init process exited due to no activity for %d seconds" ,
216262 p .appConfig .PythonEnvInitTimeout ,
217263 ))
264+ errMu .Unlock ()
218265 break
219266 }
220267 }
@@ -223,12 +270,29 @@ func (p *LocalPluginRuntime) installDependencies(
223270 wg .Wait ()
224271
225272 if err := cmd .Wait (); err != nil {
273+ log .Error ("dependency installation failed" , "plugin" , p .Config .Identity (), "duration" , time .Since (startAt ).String (), "error" , err )
226274 return fmt .Errorf ("failed to install dependencies: %s, output: %s" , err , errMsg .String ())
227275 }
228276
277+ log .Info ("dependency installation finished" , "plugin" , p .Config .Identity (), "duration" , time .Since (startAt ).String ())
229278 return nil
230279}
231280
281+ // sanitizeArgs redacts credentials in any URL-like arguments to avoid leaking secrets in logs.
282+ func sanitizeArgs (args []string ) []string {
283+ // Match https://user:pass@ and https://user@
284+ reWithPass := regexp .MustCompile (`(https?://)[^/@:]+:[^/@]+@` )
285+ reUserOnly := regexp .MustCompile (`(https?://)[^/@:]+@` )
286+
287+ out := make ([]string , len (args ))
288+ for i , a := range args {
289+ s := reWithPass .ReplaceAllString (a , "${1}****:****@" )
290+ s = reUserOnly .ReplaceAllString (s , "${1}****:****@" )
291+ out [i ] = s
292+ }
293+ return out
294+ }
295+
232296type PythonVirtualEnvironment struct {
233297 pythonInterpreterPath string
234298}
@@ -356,6 +420,39 @@ func (p *LocalPluginRuntime) markVirtualEnvironmentAsValid() error {
356420 return nil
357421}
358422
423+ // splitByCommaOrSpace splits a list like "a,b c" into tokens.
424+ func splitByCommaOrSpace (s string ) []string {
425+ // replace comma with space then split by spaces
426+ s = strings .ReplaceAll (s , "," , " " )
427+ fields := strings .Fields (s )
428+ return fields
429+ }
430+
431+ // selectURL returns the first non-empty URL from the provided list.
432+ func selectURL (urls ... string ) string {
433+ for _ , u := range urls {
434+ if u != "" {
435+ return u
436+ }
437+ }
438+ return ""
439+ }
440+
441+ // addIndexArgs appends index and extra-index URL arguments to args.
442+ func addIndexArgs (args []string , indexURL string , extraIndexURL string ) []string {
443+ if indexURL != "" {
444+ args = append (args , "-i" , indexURL )
445+ }
446+ if extraIndexURL != "" {
447+ for _ , u := range splitByCommaOrSpace (extraIndexURL ) {
448+ if u != "" {
449+ args = append (args , "--extra-index-url" , u )
450+ }
451+ }
452+ }
453+ return args
454+ }
455+
359456func (p * LocalPluginRuntime ) preCompile (
360457 pythonPath string ,
361458) error {
0 commit comments