Skip to content

Commit 98b24b5

Browse files
lin-snowclaude
andauthored
fix(cache): pass watch keys to Transaction so optimistic lock actually works (#701)
* feat(redis): add Redis Cluster client path to plugin-daemon Wire the missing third branch so operators can point the daemon at a Redis Cluster without falling back to dialing an empty REDIS_HOST:PORT and panicking at startup. * introduce REDIS_USE_CLUSTERS / REDIS_CLUSTERS / REDIS_CLUSTERS_PASSWORD envs, aligned with dify api naming so Helm can set a single env group * branch in PluginManager.Launch for Cluster ahead of the existing Sentinel / standalone paths; falls back to REDIS_PASSWORD if the cluster-specific password is not set * implement cache.InitRedisClusterClient via redis.NewClusterClient; downstream cache / lock / pub-sub helpers keep working because `client` is declared as redis.UniversalClient Redis Cluster disables SELECT DB, so the cluster branch does not plumb RedisDB. This is intentional and documented in the release note accompanying the overall Redis Cluster support work. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * feat(redis): validate Redis cluster addresses and improve transaction handling * feat(redis): implement CSV parsing utility and refactor Redis transaction handling * feat(redis): scope plugin-daemon to standalone + sentinel only Narrow the plugin-daemon Redis client to standalone and sentinel modes; remove the cluster code paths that were briefly exercised on this branch. Changes: - Remove REDIS_USE_CLUSTERS / REDIS_CLUSTERS / REDIS_CLUSTERS_PASSWORD env fields from app.Config. - Remove the cluster branch (plus the two cluster-specific fail-fast guards on REDIS_USE_CLUSTERS + REDIS_USE_SENTINEL and REDIS_DB != 0) from PluginManager.Launch. The init block is now just sentinel or standalone. - Delete cache.InitRedisClusterClient. The main redis client + all helpers (Transaction, pub/sub, lock, etc.) already route through redis.UniversalClient so downstream code needs no adjustment. Intentionally kept: - Transaction(fn, watchKeys...) signature: the variadic signature is backwards-compatible with every existing Transaction(fn) call and the one new-style caller (debugging_service) benefits from real WATCH semantics even on standalone — this is correctness-independent of cluster support. - parser.SplitAndTrimCSV: the sentinel branch migrated to it in 617e7a5 and still uses it to parse REDIS_SENTINELS, so the helper has a standalone/sentinel consumer and stays. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 41f89d2 commit 98b24b5

5 files changed

Lines changed: 66 additions & 6 deletions

File tree

internal/core/plugin_manager/manager.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package plugin_manager
22

33
import (
44
"fmt"
5-
"strings"
65

76
lru "github.com/hashicorp/golang-lru/v2"
87
"github.com/langgenius/dify-cloud-kit/oss"
@@ -18,6 +17,7 @@ import (
1817
"github.com/langgenius/dify-plugin-daemon/pkg/plugin_packager/decoder"
1918
"github.com/langgenius/dify-plugin-daemon/pkg/utils/cache"
2019
"github.com/langgenius/dify-plugin-daemon/pkg/utils/log"
20+
"github.com/langgenius/dify-plugin-daemon/pkg/utils/parser"
2121
)
2222

2323
type PluginManager struct {
@@ -128,7 +128,7 @@ func (p *PluginManager) Launch(configuration *app.Config) {
128128
// init redis client
129129
if configuration.RedisUseSentinel {
130130
// use Redis Sentinel
131-
sentinels := strings.Split(configuration.RedisSentinels, ",")
131+
sentinels := parser.SplitAndTrimCSV(configuration.RedisSentinels)
132132
if err := cache.InitRedisSentinelClient(
133133
sentinels,
134134
configuration.RedisSentinelServiceName,

internal/service/debugging_service/connection_key.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,11 @@ func GetConnectionKey(info ConnectionInfo) (string, error) {
4545
)
4646

4747
if err == cache.ErrNotFound {
48+
id2key := strings.Join([]string{CONNECTION_KEY_MANAGER_ID2KEY_PREFIX, info.TenantId}, ":")
4849
err := cache.Transaction(func(p redis.Pipeliner) error {
4950
k := uuid.New().String()
5051
_, err = cache.SetNX(
51-
strings.Join([]string{CONNECTION_KEY_MANAGER_ID2KEY_PREFIX, info.TenantId}, ":"),
52+
id2key,
5253
Key{Key: k},
5354
CONNECTION_KEY_EXPIRE_TIME,
5455
p,
@@ -70,7 +71,7 @@ func GetConnectionKey(info ConnectionInfo) (string, error) {
7071
key = &Key{Key: k}
7172

7273
return nil
73-
})
74+
}, id2key)
7475

7576
if err != nil {
7677
return "", err

pkg/utils/cache/redis.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -582,11 +582,16 @@ func Expire(key string, time time.Duration, context ...redis.Cmdable) (bool, err
582582
return getCmdable(context...).Expire(ctx, serialKey(key), time).Result()
583583
}
584584

585-
func Transaction(fn func(redis.Pipeliner) error) error {
585+
func Transaction(fn func(redis.Pipeliner) error, watchKeys ...string) error {
586586
if client == nil {
587587
return ErrDBNotInit
588588
}
589589

590+
serialized := make([]string, len(watchKeys))
591+
for i, k := range watchKeys {
592+
serialized[i] = serialKey(k)
593+
}
594+
590595
return client.Watch(ctx, func(tx *redis.Tx) error {
591596
_, err := tx.TxPipelined(ctx, func(p redis.Pipeliner) error {
592597
return fn(p)
@@ -595,7 +600,7 @@ func Transaction(fn func(redis.Pipeliner) error) error {
595600
return nil
596601
}
597602
return err
598-
})
603+
}, serialized...)
599604
}
600605

601606
func Publish(channel string, message any, context ...redis.Cmdable) error {

pkg/utils/parser/csv.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package parser
2+
3+
import "strings"
4+
5+
func SplitAndTrimCSV(s string) []string {
6+
if s == "" {
7+
return nil
8+
}
9+
parts := strings.Split(s, ",")
10+
out := make([]string, 0, len(parts))
11+
for _, p := range parts {
12+
if trimmed := strings.TrimSpace(p); trimmed != "" {
13+
out = append(out, trimmed)
14+
}
15+
}
16+
return out
17+
}

pkg/utils/parser/csv_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package parser
2+
3+
import (
4+
"reflect"
5+
"testing"
6+
)
7+
8+
func TestSplitAndTrimCSV(t *testing.T) {
9+
tests := []struct {
10+
name string
11+
input string
12+
want []string
13+
}{
14+
{"empty string", "", nil},
15+
{"only separators", ",,,", []string{}},
16+
{"only whitespace between separators", " , , ,", []string{}},
17+
{"single value", "a", []string{"a"}},
18+
{"trims surrounding whitespace", " a , b ", []string{"a", "b"}},
19+
{"drops empty segments", "a, ,b", []string{"a", "b"}},
20+
{"preserves inner colons", "host1:6379, host2:6379", []string{"host1:6379", "host2:6379"}},
21+
{"trailing comma", "a,b,", []string{"a", "b"}},
22+
{"leading comma", ",a,b", []string{"a", "b"}},
23+
{"tabs and mixed whitespace", "\ta\t,\nb\n", []string{"a", "b"}},
24+
}
25+
26+
for _, tt := range tests {
27+
t.Run(tt.name, func(t *testing.T) {
28+
got := SplitAndTrimCSV(tt.input)
29+
if len(got) == 0 && len(tt.want) == 0 {
30+
return
31+
}
32+
if !reflect.DeepEqual(got, tt.want) {
33+
t.Errorf("SplitAndTrimCSV(%q) = %#v, want %#v", tt.input, got, tt.want)
34+
}
35+
})
36+
}
37+
}

0 commit comments

Comments
 (0)