Skip to content

Commit 1d57c01

Browse files
committed
feat(proxy): route ElasticKV traffic to the current Raft leader via INFO
INFO replication on an ElasticKV node now reports its actual role (master/slave) and a new raft_leader_redis field pointing at the Redis address of the current Raft leader. The proxy wraps the ElasticKV backend in LeaderAwareRedisBackend, which polls INFO every 2s and routes every Do/Pipeline/PubSub through the discovered leader, falling back to the configured seeds when a probe fails. -secondary now accepts a comma-separated seed list.
1 parent 04874ad commit 1d57c01

5 files changed

Lines changed: 725 additions & 4 deletions

File tree

adapter/redis_compat_commands.go

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,15 +73,45 @@ type bzpopminResult struct {
7373
}
7474

7575
func (r *RedisServer) info(conn redcon.Conn, _ redcon.Command) {
76+
role := "slave"
77+
if r.coordinator != nil && r.coordinator.IsLeader() {
78+
role = "master"
79+
}
80+
81+
leaderRedis := r.raftLeaderRedisAddr()
82+
7683
conn.WriteBulkString(strings.Join([]string{
7784
"# Server",
7885
"redis_version:7.2.0",
7986
"loading:0",
80-
"role:master",
87+
"role:" + role,
88+
"",
89+
"# Replication",
90+
"role:" + role,
91+
"raft_leader_redis:" + leaderRedis,
8192
"",
8293
}, "\r\n"))
8394
}
8495

96+
// raftLeaderRedisAddr returns the Redis-protocol address of the current Raft
97+
// leader as known by this node. When this node is itself the leader the
98+
// server's own listen address is returned. An empty string is returned when
99+
// the leader is not yet known or when the leader's Redis address is not
100+
// configured in the leaderRedis map.
101+
func (r *RedisServer) raftLeaderRedisAddr() string {
102+
if r.coordinator == nil {
103+
return ""
104+
}
105+
if r.coordinator.IsLeader() {
106+
return r.redisAddr
107+
}
108+
leader := r.coordinator.RaftLeader()
109+
if leader == "" {
110+
return ""
111+
}
112+
return r.leaderRedis[leader]
113+
}
114+
85115
// SETEX key seconds value — equivalent to SET key value EX seconds
86116
func (r *RedisServer) setex(conn redcon.Conn, cmd redcon.Command) {
87117
if r.proxyToLeader(conn, cmd, cmd.Args[1]) {

adapter/redis_info_test.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package adapter
2+
3+
import (
4+
"context"
5+
"strings"
6+
"testing"
7+
8+
"github.com/bootjp/elastickv/kv"
9+
"github.com/hashicorp/raft"
10+
"github.com/stretchr/testify/assert"
11+
"github.com/stretchr/testify/require"
12+
"github.com/tidwall/redcon"
13+
)
14+
15+
type infoTestCoordinator struct {
16+
isLeader bool
17+
raftLeader raft.ServerAddress
18+
clock *kv.HLC
19+
}
20+
21+
func (c *infoTestCoordinator) Dispatch(context.Context, *kv.OperationGroup[kv.OP]) (*kv.CoordinateResponse, error) {
22+
return &kv.CoordinateResponse{}, nil
23+
}
24+
func (c *infoTestCoordinator) IsLeader() bool { return c.isLeader }
25+
func (c *infoTestCoordinator) VerifyLeader() error { return nil }
26+
func (c *infoTestCoordinator) RaftLeader() raft.ServerAddress { return c.raftLeader }
27+
func (c *infoTestCoordinator) IsLeaderForKey([]byte) bool { return c.isLeader }
28+
func (c *infoTestCoordinator) VerifyLeaderForKey([]byte) error { return nil }
29+
func (c *infoTestCoordinator) RaftLeaderForKey([]byte) raft.ServerAddress { return c.raftLeader }
30+
func (c *infoTestCoordinator) Clock() *kv.HLC {
31+
if c.clock == nil {
32+
c.clock = kv.NewHLC()
33+
}
34+
return c.clock
35+
}
36+
37+
func TestRedisServer_Info_LeaderRole(t *testing.T) {
38+
r := &RedisServer{
39+
redisAddr: "10.0.0.1:6379",
40+
leaderRedis: map[raft.ServerAddress]string{"raft-1": "10.0.0.1:6379"},
41+
coordinator: &infoTestCoordinator{isLeader: true, raftLeader: "raft-1"},
42+
}
43+
44+
conn := &recordingConn{}
45+
r.info(conn, redcon.Command{})
46+
47+
out := string(conn.bulk)
48+
assert.Contains(t, out, "# Server", "INFO reply must keep the Server section")
49+
assert.Contains(t, out, "# Replication", "INFO reply must expose a Replication section")
50+
assert.Contains(t, out, "role:master", "leader must advertise role:master")
51+
assert.Contains(t, out, "raft_leader_redis:10.0.0.1:6379",
52+
"leader must point raft_leader_redis at itself")
53+
}
54+
55+
func TestRedisServer_Info_FollowerRole(t *testing.T) {
56+
r := &RedisServer{
57+
redisAddr: "10.0.0.2:6379",
58+
leaderRedis: map[raft.ServerAddress]string{
59+
"raft-1": "10.0.0.1:6379",
60+
"raft-2": "10.0.0.2:6379",
61+
},
62+
coordinator: &infoTestCoordinator{isLeader: false, raftLeader: "raft-1"},
63+
}
64+
65+
conn := &recordingConn{}
66+
r.info(conn, redcon.Command{})
67+
68+
out := string(conn.bulk)
69+
assert.Contains(t, out, "role:slave", "follower must advertise role:slave")
70+
assert.Contains(t, out, "raft_leader_redis:10.0.0.1:6379",
71+
"follower must point raft_leader_redis at the actual leader")
72+
// The role must appear in the Replication section so clients that only
73+
// scan that section still see the right value.
74+
idx := strings.Index(out, "# Replication")
75+
require.GreaterOrEqual(t, idx, 0)
76+
assert.Contains(t, out[idx:], "role:slave")
77+
assert.Contains(t, out[idx:], "raft_leader_redis:10.0.0.1:6379")
78+
}
79+
80+
func TestRedisServer_Info_UnknownLeader(t *testing.T) {
81+
r := &RedisServer{
82+
redisAddr: "10.0.0.3:6379",
83+
leaderRedis: map[raft.ServerAddress]string{},
84+
coordinator: &infoTestCoordinator{isLeader: false, raftLeader: ""},
85+
}
86+
87+
conn := &recordingConn{}
88+
r.info(conn, redcon.Command{})
89+
90+
out := string(conn.bulk)
91+
assert.Contains(t, out, "role:slave")
92+
assert.Contains(t, out, "raft_leader_redis:\r\n",
93+
"when the leader is unknown the field must be empty so clients know to keep probing")
94+
}

cmd/redis-proxy/main.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"net/http"
1010
"os"
1111
"os/signal"
12+
"strings"
1213
"syscall"
1314
"time"
1415

@@ -37,7 +38,7 @@ func run() error {
3738
flag.StringVar(&cfg.PrimaryAddr, "primary", cfg.PrimaryAddr, "Primary (Redis) address")
3839
flag.IntVar(&cfg.PrimaryDB, "primary-db", cfg.PrimaryDB, "Primary Redis DB number")
3940
flag.StringVar(&cfg.PrimaryPassword, "primary-password", cfg.PrimaryPassword, "Primary Redis password")
40-
flag.StringVar(&cfg.SecondaryAddr, "secondary", cfg.SecondaryAddr, "Secondary (ElasticKV) address")
41+
flag.StringVar(&cfg.SecondaryAddr, "secondary", cfg.SecondaryAddr, "Secondary (ElasticKV) address. Comma-separated list of seeds is supported; the proxy discovers the current Raft leader via INFO replication.")
4142
flag.IntVar(&cfg.SecondaryDB, "secondary-db", cfg.SecondaryDB, "Secondary Redis DB number")
4243
flag.StringVar(&cfg.SecondaryPassword, "secondary-password", cfg.SecondaryPassword, "Secondary Redis password")
4344
flag.StringVar(&modeStr, "mode", "dual-write", "Proxy mode: redis-only, dual-write, dual-write-shadow, elastickv-primary, elastickv-only")
@@ -73,14 +74,19 @@ func run() error {
7374
secondaryOpts.DB = cfg.SecondaryDB
7475
secondaryOpts.Password = cfg.SecondaryPassword
7576

77+
secondarySeeds := parseAddrList(cfg.SecondaryAddr)
78+
if len(secondarySeeds) == 0 {
79+
return fmt.Errorf("at least one secondary address is required")
80+
}
81+
7682
var primary, secondary proxy.Backend
7783
switch cfg.Mode {
7884
case proxy.ModeElasticKVPrimary, proxy.ModeElasticKVOnly:
79-
primary = proxy.NewRedisBackendWithOptions(cfg.SecondaryAddr, "elastickv", secondaryOpts)
85+
primary = proxy.NewLeaderAwareRedisBackend(secondarySeeds, "elastickv", secondaryOpts, logger)
8086
secondary = proxy.NewRedisBackendWithOptions(cfg.PrimaryAddr, "redis", primaryOpts)
8187
case proxy.ModeRedisOnly, proxy.ModeDualWrite, proxy.ModeDualWriteShadow:
8288
primary = proxy.NewRedisBackendWithOptions(cfg.PrimaryAddr, "redis", primaryOpts)
83-
secondary = proxy.NewRedisBackendWithOptions(cfg.SecondaryAddr, "elastickv", secondaryOpts)
89+
secondary = proxy.NewLeaderAwareRedisBackend(secondarySeeds, "elastickv", secondaryOpts, logger)
8490
}
8591
defer primary.Close()
8692
defer secondary.Close()
@@ -124,3 +130,14 @@ func run() error {
124130
}
125131
return nil
126132
}
133+
134+
func parseAddrList(raw string) []string {
135+
parts := strings.Split(raw, ",")
136+
out := make([]string, 0, len(parts))
137+
for _, p := range parts {
138+
if p = strings.TrimSpace(p); p != "" {
139+
out = append(out, p)
140+
}
141+
}
142+
return out
143+
}

0 commit comments

Comments
 (0)