diff --git a/internal/core/plugin_manager/manager.go b/internal/core/plugin_manager/manager.go index f7dfa4eb8..8f1491902 100644 --- a/internal/core/plugin_manager/manager.go +++ b/internal/core/plugin_manager/manager.go @@ -2,7 +2,6 @@ package plugin_manager import ( "fmt" - "strings" lru "github.com/hashicorp/golang-lru/v2" "github.com/langgenius/dify-cloud-kit/oss" @@ -18,6 +17,7 @@ import ( "github.com/langgenius/dify-plugin-daemon/pkg/plugin_packager/decoder" "github.com/langgenius/dify-plugin-daemon/pkg/utils/cache" "github.com/langgenius/dify-plugin-daemon/pkg/utils/log" + "github.com/langgenius/dify-plugin-daemon/pkg/utils/parser" ) type PluginManager struct { @@ -128,7 +128,7 @@ func (p *PluginManager) Launch(configuration *app.Config) { // init redis client if configuration.RedisUseSentinel { // use Redis Sentinel - sentinels := strings.Split(configuration.RedisSentinels, ",") + sentinels := parser.SplitAndTrimCSV(configuration.RedisSentinels) if err := cache.InitRedisSentinelClient( sentinels, configuration.RedisSentinelServiceName, diff --git a/internal/service/debugging_service/connection_key.go b/internal/service/debugging_service/connection_key.go index 8390cb179..3db75b2a4 100644 --- a/internal/service/debugging_service/connection_key.go +++ b/internal/service/debugging_service/connection_key.go @@ -45,10 +45,11 @@ func GetConnectionKey(info ConnectionInfo) (string, error) { ) if err == cache.ErrNotFound { + id2key := strings.Join([]string{CONNECTION_KEY_MANAGER_ID2KEY_PREFIX, info.TenantId}, ":") err := cache.Transaction(func(p redis.Pipeliner) error { k := uuid.New().String() _, err = cache.SetNX( - strings.Join([]string{CONNECTION_KEY_MANAGER_ID2KEY_PREFIX, info.TenantId}, ":"), + id2key, Key{Key: k}, CONNECTION_KEY_EXPIRE_TIME, p, @@ -70,7 +71,7 @@ func GetConnectionKey(info ConnectionInfo) (string, error) { key = &Key{Key: k} return nil - }) + }, id2key) if err != nil { return "", err diff --git a/pkg/utils/cache/redis.go b/pkg/utils/cache/redis.go index 8333b5be9..195b675ce 100644 --- a/pkg/utils/cache/redis.go +++ b/pkg/utils/cache/redis.go @@ -582,11 +582,16 @@ func Expire(key string, time time.Duration, context ...redis.Cmdable) (bool, err return getCmdable(context...).Expire(ctx, serialKey(key), time).Result() } -func Transaction(fn func(redis.Pipeliner) error) error { +func Transaction(fn func(redis.Pipeliner) error, watchKeys ...string) error { if client == nil { return ErrDBNotInit } + serialized := make([]string, len(watchKeys)) + for i, k := range watchKeys { + serialized[i] = serialKey(k) + } + return client.Watch(ctx, func(tx *redis.Tx) error { _, err := tx.TxPipelined(ctx, func(p redis.Pipeliner) error { return fn(p) @@ -595,7 +600,7 @@ func Transaction(fn func(redis.Pipeliner) error) error { return nil } return err - }) + }, serialized...) } func Publish(channel string, message any, context ...redis.Cmdable) error { diff --git a/pkg/utils/parser/csv.go b/pkg/utils/parser/csv.go new file mode 100644 index 000000000..ab48c7af9 --- /dev/null +++ b/pkg/utils/parser/csv.go @@ -0,0 +1,17 @@ +package parser + +import "strings" + +func SplitAndTrimCSV(s string) []string { + if s == "" { + return nil + } + parts := strings.Split(s, ",") + out := make([]string, 0, len(parts)) + for _, p := range parts { + if trimmed := strings.TrimSpace(p); trimmed != "" { + out = append(out, trimmed) + } + } + return out +} diff --git a/pkg/utils/parser/csv_test.go b/pkg/utils/parser/csv_test.go new file mode 100644 index 000000000..facbe5f28 --- /dev/null +++ b/pkg/utils/parser/csv_test.go @@ -0,0 +1,37 @@ +package parser + +import ( + "reflect" + "testing" +) + +func TestSplitAndTrimCSV(t *testing.T) { + tests := []struct { + name string + input string + want []string + }{ + {"empty string", "", nil}, + {"only separators", ",,,", []string{}}, + {"only whitespace between separators", " , , ,", []string{}}, + {"single value", "a", []string{"a"}}, + {"trims surrounding whitespace", " a , b ", []string{"a", "b"}}, + {"drops empty segments", "a, ,b", []string{"a", "b"}}, + {"preserves inner colons", "host1:6379, host2:6379", []string{"host1:6379", "host2:6379"}}, + {"trailing comma", "a,b,", []string{"a", "b"}}, + {"leading comma", ",a,b", []string{"a", "b"}}, + {"tabs and mixed whitespace", "\ta\t,\nb\n", []string{"a", "b"}}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := SplitAndTrimCSV(tt.input) + if len(got) == 0 && len(tt.want) == 0 { + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("SplitAndTrimCSV(%q) = %#v, want %#v", tt.input, got, tt.want) + } + }) + } +}