Skip to content

Commit e8f8f17

Browse files
fateleiYeuoly
andauthored
feat: add redis distribute lock (#564)
* feat: add redis distribute lock * refactor: use structured logging for python env --------- Co-authored-by: Yeuoly <admin@srmxy.cn>
1 parent 2be1f65 commit e8f8f17

8 files changed

Lines changed: 65 additions & 20 deletions

File tree

go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ require (
3737
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.27.0 // indirect
3838
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.51.0 // indirect
3939
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.51.0 // indirect
40+
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
41+
github.com/alicebob/miniredis/v2 v2.30.5 // indirect
4042
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible // indirect
4143
github.com/atotto/clipboard v0.1.4 // indirect
4244
github.com/aws/aws-sdk-go-v2 v1.36.3 // indirect
@@ -115,6 +117,7 @@ require (
115117
github.com/x448/float16 v0.8.4 // indirect
116118
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f // indirect
117119
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
120+
github.com/yuin/gopher-lua v1.1.0 // indirect
118121
github.com/zeebo/errs v1.4.0 // indirect
119122
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
120123
go.opentelemetry.io/contrib/detectors/gcp v1.35.0 // indirect

go.sum

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/cloudmock v0
4141
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.51.0 h1:6/0iUd0xrnX7qt+mLNRwg5c0PGv8wpE8K90ryANQwMI=
4242
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.51.0/go.mod h1:otE2jQekW/PqXk1Awf5lmfokJx4uwuqcj1ab5SpGeW0=
4343
github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM=
44+
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk=
45+
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
46+
github.com/alicebob/miniredis/v2 v2.30.5 h1:3r6kTHdKnuP4fkS8k2IrvSfxpxUTcW1SOL0wN7b7Dt0=
47+
github.com/alicebob/miniredis/v2 v2.30.5/go.mod h1:b25qWj4fCEsBeAAR2mlb0ufImGC6uH3VlUfb/HS5zKg=
4448
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible h1:8psS8a+wKfiLt1iVDX79F7Y6wUM49Lcha2FMXt4UM8g=
4549
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8=
4650
github.com/atotto/clipboard v0.1.4 h1:EH0zSVneZPSuFR11BlR9YppQTVDbh5+16AmcJi4g1z4=
@@ -103,6 +107,9 @@ github.com/charmbracelet/x/ansi v0.2.3 h1:VfFN0NUpcjBRd4DnKfRaIRo53KRgey/nhOoEqo
103107
github.com/charmbracelet/x/ansi v0.2.3/go.mod h1:dk73KoMTT5AX5BsX0KrqhsTqAnhZZoCBjs7dGWp4Ktw=
104108
github.com/charmbracelet/x/term v0.2.0 h1:cNB9Ot9q8I711MyZ7myUR5HFWL/lc3OpU8jZ4hwm0x0=
105109
github.com/charmbracelet/x/term v0.2.0/go.mod h1:GVxgxAbjUrmpvIINHIQnJJKpMlHiZ4cktEQCN6GWyF0=
110+
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
111+
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
112+
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
106113
github.com/clbanning/mxj v1.8.4 h1:HuhwZtbyvyOw+3Z1AowPkU87JkJUSv751ELWaiTpj8I=
107114
github.com/clbanning/mxj v1.8.4/go.mod h1:BVjHeAH+rl9rs6f+QIpeRl0tfu10SXn1pUSa5PVGJng=
108115
github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y=
@@ -352,6 +359,8 @@ github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHo
352359
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
353360
github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74=
354361
github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
362+
github.com/yuin/gopher-lua v1.1.0 h1:BojcDhfyDWgU2f2TOzYK/g5p2gxMrku8oupLDqlnSqE=
363+
github.com/yuin/gopher-lua v1.1.0/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
355364
github.com/zeebo/errs v1.4.0 h1:XNdoD/RRMKP7HD0UhJnIzUy74ISdGGxURlYG8HSWSfM=
356365
github.com/zeebo/errs v1.4.0/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4=
357366
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
@@ -397,6 +406,7 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ
397406
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
398407
golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I=
399408
golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
409+
golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
400410
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
401411
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
402412
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

internal/core/control_panel/launcher_local.go

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@ import (
44
"errors"
55
"fmt"
66
"sync"
7+
"time"
78

89
"github.com/langgenius/dify-plugin-daemon/internal/core/local_runtime"
910
"github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
1011
routinepkg "github.com/langgenius/dify-plugin-daemon/pkg/routine"
12+
"github.com/langgenius/dify-plugin-daemon/pkg/utils/cache"
13+
"github.com/langgenius/dify-plugin-daemon/pkg/utils/log"
1114
"github.com/langgenius/dify-plugin-daemon/pkg/utils/routine"
1215
)
1316

@@ -62,15 +65,41 @@ func (c *ControlPanel) LaunchLocalPlugin(
6265
// init environment
6366
// whatever it's a user request to launch a plugin or a new plugin was found
6467
// by watch dog, initialize environment is a must
65-
if err := runtime.InitEnvironment(decoder); err != nil {
66-
err = errors.Join(err, fmt.Errorf("failed to init environment"))
67-
// notify new runtime launch failed
68-
c.WalkNotifiers(func(notifier ControlPanelNotifier) {
69-
notifier.OnLocalRuntimeStartFailed(pluginUniqueIdentifier, err)
70-
})
71-
// release semaphore
72-
releaseLockAndSemaphore()
73-
return nil, nil, err
68+
// To avoid cross-pod races on Python venv creation, guard InitEnvironment with a Redis-based distributed lock.
69+
{
70+
lockKey := fmt.Sprintf("env_init_lock:%s", pluginUniqueIdentifier.String())
71+
// expire: generous upper bound for env initialization; tryLockTimeout: wait up to the same duration
72+
expire := 15 * time.Minute
73+
tryTimeout := 2 * time.Minute
74+
log.Info("acquiring distributed init lock", "plugin", pluginUniqueIdentifier.String(), "expire", expire.String())
75+
if err := cache.Lock(lockKey, expire, tryTimeout); err != nil {
76+
// failed to acquire the lock within timeout
77+
err = errors.Join(err, fmt.Errorf("failed to acquire distributed env-init lock"))
78+
c.WalkNotifiers(func(notifier ControlPanelNotifier) {
79+
notifier.OnLocalRuntimeStartFailed(pluginUniqueIdentifier, err)
80+
})
81+
// release semaphore and local lock
82+
releaseLockAndSemaphore()
83+
return nil, nil, err
84+
}
85+
defer func() {
86+
if unlockErr := cache.Unlock(lockKey); unlockErr != nil {
87+
log.Warn("failed to release distributed init lock", "plugin", pluginUniqueIdentifier.String(), "error", unlockErr.Error())
88+
} else {
89+
log.Info("released distributed init lock", "plugin", pluginUniqueIdentifier.String())
90+
}
91+
}()
92+
93+
if err := runtime.InitEnvironment(decoder); err != nil {
94+
err = errors.Join(err, fmt.Errorf("failed to init environment"))
95+
// notify new runtime launch failed
96+
c.WalkNotifiers(func(notifier ControlPanelNotifier) {
97+
notifier.OnLocalRuntimeStartFailed(pluginUniqueIdentifier, err)
98+
})
99+
// release semaphore
100+
releaseLockAndSemaphore()
101+
return nil, nil, err
102+
}
74103
}
75104

76105
once := sync.Once{}

internal/core/local_runtime/environment_python.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ func (p *LocalPluginRuntime) InitPythonEnvironment() error {
1919
switch err {
2020
case ErrVirtualEnvironmentInvalid:
2121
// remove the venv and rebuild it
22+
log.Warn("virtual environment for %s is invalid; deleting and recreating", p.Config.Identity())
2223
p.deleteVirtualEnvironment()
2324

2425
// create virtual environment

internal/core/local_runtime/setup_python_environment.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -277,11 +277,15 @@ func (p *LocalPluginRuntime) checkPythonVirtualEnvironment() (*PythonVirtualEnvi
277277

278278
func (p *LocalPluginRuntime) deleteVirtualEnvironment() error {
279279
// check if virtual environment exists
280-
if _, err := os.Stat(path.Join(p.State.WorkingPath, envPath)); err != nil {
281-
return nil
280+
venvDir := path.Join(p.State.WorkingPath, envPath)
281+
if _, err := os.Stat(venvDir); err != nil {
282+
if errors.Is(err, os.ErrNotExist) {
283+
return nil
284+
}
285+
return err
282286
}
283-
284-
return os.RemoveAll(path.Join(p.State.WorkingPath, envPath))
287+
log.Warn("deleting existing Python virtual environment", "plugin", p.Config.Identity(), "path", venvDir)
288+
return os.RemoveAll(venvDir)
285289
}
286290

287291
func (p *LocalPluginRuntime) createVirtualEnvironment(

pkg/utils/cache/helper/keys.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,4 @@ func EndpointCacheKey(hookId string) string {
2222
},
2323
":",
2424
)
25-
}
25+
}

pkg/utils/mapping/sync.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func (m *Map[K, V]) Range(f func(key K, value V) bool) {
5555
func (m *Map[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) {
5656
m.mu.Lock()
5757
defer m.mu.Unlock()
58-
58+
5959
v, loaded := m.store.LoadOrStore(key, value)
6060
actual = v.(V)
6161
if !loaded {

pkg/utils/mapping/sync_test.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func TestConcurrentAccess(t *testing.T) {
5656

5757
var wg sync.WaitGroup
5858
wg.Add(workers)
59-
59+
6060
for i := 0; i < workers; i++ {
6161
go func(i int) {
6262
defer wg.Done()
@@ -78,7 +78,7 @@ func TestLoadOrStore(t *testing.T) {
7878
m := Map[string, interface{}]{}
7979

8080
// First store
81-
val, loaded := m.LoadOrStore("data", []byte{1,2,3})
81+
val, loaded := m.LoadOrStore("data", []byte{1, 2, 3})
8282
if loaded || val.([]byte)[0] != 1 {
8383
t.Error("Initial LoadOrStore failed")
8484
}
@@ -90,8 +90,6 @@ func TestLoadOrStore(t *testing.T) {
9090
}
9191
}
9292

93-
94-
9593
// TestEdgeCases covers special scenarios
9694
func TestEdgeCases(t *testing.T) {
9795
t.Parallel()
@@ -108,4 +106,4 @@ func TestEdgeCases(t *testing.T) {
108106
if m.Len() != 0 {
109107
t.Error("Clear failed to reset map")
110108
}
111-
}
109+
}

0 commit comments

Comments
 (0)