Skip to content

Commit 6c9b171

Browse files
author
Artem Klyukvin
authored
refactor shard selection (#27)
1 parent 1e7b2fa commit 6c9b171

3 files changed

Lines changed: 106 additions & 78 deletions

File tree

rediscluster/cluster.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ const (
5959
preferConnected
6060
)
6161

62+
const (
63+
disabled = 0
64+
enabled = 1
65+
)
66+
6267
// Opts holds the options for Cluster
6368
type Opts struct {
6469
// HostOpts - per host options
@@ -144,10 +149,10 @@ type clusterConfig struct {
144149
}
145150

146151
type shard struct {
147-
rr uint32
148-
good uint32
149-
addr []string
150-
weights []uint32
152+
rr uint32
153+
good uint32
154+
addr []string
155+
pingWeights []uint32
151156
}
152157
type shardMap map[uint16]*shard
153158
type masterMap map[string]uint16
@@ -223,9 +228,9 @@ func NewCluster(ctx context.Context, initAddrs []string, opts Opts) (*Cluster, e
223228
cluster.opts.WaitToMigrate = 100 * time.Millisecond
224229
}
225230

226-
cluster.latencyAwareness = 0
231+
cluster.latencyAwareness = disabled
227232
if cluster.opts.LatencyOrientedRR {
228-
cluster.latencyAwareness = 1
233+
cluster.latencyAwareness = enabled
229234
}
230235
cluster.opts.HostOpts.TLSEnabled = opts.TLSEnabled
231236
cluster.opts.HostOpts.TLSConfig = opts.TLSConfig
@@ -290,9 +295,9 @@ func (c *Cluster) Handle() interface{} {
290295
// SetLatencyOrientedRR changes "latency awareness" on the fly.
291296
func (c *Cluster) SetLatencyOrientedRR(v bool) {
292297
if v {
293-
atomic.StoreUint32(&c.latencyAwareness, 1)
298+
atomic.StoreUint32(&c.latencyAwareness, enabled)
294299
} else {
295-
atomic.StoreUint32(&c.latencyAwareness, 0)
300+
atomic.StoreUint32(&c.latencyAwareness, disabled)
296301
}
297302
}
298303

rediscluster/mapping.go

Lines changed: 87 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -247,90 +247,113 @@ var rr, rs = func() ([32]uint32, [32]uint32) {
247247

248248
// connForSlot returns established connection for slot, if it exists.
249249
func (c *Cluster) connForSlot(slot uint16, policy ReplicaPolicyEnum, seen []*redisconn.Connection) (*redisconn.Connection, *errorx.Error) {
250-
var conn *redisconn.Connection
251250
cfg := c.getConfig()
252251
shard := cfg.slot2shard(slot)
253-
nodes := cfg.nodes
254252

255253
if shard == nil {
256254
return nil, c.err(ErrClusterConfigEmpty).WithProperty(redis.EKSlot, slot)
257255
}
258256

259-
var addr string
257+
conn := c.connForPolicy(policy, seen, shard, cfg)
258+
if conn == nil {
259+
c.ForceReloading()
260+
return nil, c.err(ErrNoAliveConnection).WithProperty(redis.EKSlot, slot).WithProperty(EKPolicy, policy)
261+
}
262+
return conn, nil
263+
}
264+
265+
func (c *Cluster) connForPolicy(policy ReplicaPolicyEnum, seen []*redisconn.Connection, shard *shard, cfg *clusterConfig) *redisconn.Connection {
260266
switch policy {
261267
case MasterOnly:
262-
addr = shard.addr[0]
263-
node := nodes[addr]
264-
if node == nil {
265-
break /*switch*/
266-
}
267-
conn = node.getConn(c.opts.ConnHostPolicy, preferConnected, seen)
268+
return c.connForPolicyMaster(seen, shard, cfg)
268269
case MasterAndSlaves, PreferSlaves:
269-
var ws [32]uint32
270-
if atomic.LoadUint32(&c.latencyAwareness) == 0 {
271-
ws = rr
272-
if policy == PreferSlaves {
273-
ws = rs
274-
}
275-
} else {
276-
for i := range shard.weights {
277-
ws[i] = atomic.LoadUint32(&shard.weights[i])
278-
}
279-
}
280-
weights := ws[:len(shard.weights)]
270+
return c.connForPolicySlaves(policy, seen, shard, cfg)
271+
default:
272+
panic("unknown policy")
273+
}
274+
}
281275

282-
health := atomic.LoadUint32(&shard.good) // load health information
283-
healthWeight := uint32(0)
284-
for i, w := range weights {
285-
if health&(1<<uint(i)) == 0 {
286-
continue
287-
}
288-
healthWeight += w
289-
}
276+
func (c *Cluster) connForPolicyMaster(seen []*redisconn.Connection, shard *shard, cfg *clusterConfig) *redisconn.Connection {
277+
nodes := cfg.nodes
290278

291-
off := c.opts.RoundRobinSeed.Current()
292-
293-
// First, we try already established connections.
294-
// If no one found, then connections thar are connecting at the moment are tried.
295-
for _, needState := range []int{needConnected, mayBeConnected} {
296-
mask, maskWeight := health, healthWeight
297-
// a bit of quadratic algorithms
298-
for mask != 0 && conn == nil {
299-
r := nextRng(&off, maskWeight)
300-
k := uint(0)
301-
for i, w := range weights {
302-
if mask&(1<<uint(i)) == 0 {
303-
continue
304-
}
305-
if r < w {
306-
k = uint(i)
307-
break
308-
}
309-
r -= w
310-
}
279+
addr := shard.addr[0]
280+
node := nodes[addr]
281+
if node == nil {
282+
return nil
283+
}
284+
return node.getConn(c.opts.ConnHostPolicy, preferConnected, seen)
285+
}
286+
287+
func (c *Cluster) connForPolicySlaves(policy ReplicaPolicyEnum, seen []*redisconn.Connection, shard *shard, cfg *clusterConfig) *redisconn.Connection {
288+
weights := c.weightsForPolicySlaves(policy, shard)
311289

312-
mask &^= 1 << k
313-
maskWeight -= weights[k]
314-
addr = shard.addr[k]
315-
node := nodes[addr]
316-
if node == nil {
317-
// it is strange a bit, but lets ignore
290+
health := atomic.LoadUint32(&shard.good) // load health information
291+
healthWeight := c.getHealthWeight(weights, health)
292+
off := c.opts.RoundRobinSeed.Current()
293+
294+
// First, we try already established connections.
295+
// If no one found, then connections thar are connecting at the moment are tried.
296+
for _, needState := range []int{needConnected, mayBeConnected} {
297+
mask, maskWeight := health, healthWeight
298+
299+
for mask != 0 {
300+
r := nextRng(&off, maskWeight)
301+
k := uint(0)
302+
for i, w := range weights {
303+
if mask&(1<<uint(i)) == 0 { // not healthy
318304
continue
319305
}
320-
conn = node.getConn(c.opts.ConnHostPolicy, needState, seen)
306+
if r < w {
307+
k = uint(i)
308+
break
309+
}
310+
r -= w
311+
}
312+
313+
mask &^= 1 << k
314+
maskWeight -= weights[k]
315+
addr := shard.addr[k]
316+
nodes := cfg.nodes
317+
node := nodes[addr]
318+
if node == nil {
319+
// it is strange a bit, but lets ignore
320+
continue
321321
}
322-
if conn != nil {
323-
break
322+
323+
if conn := node.getConn(c.opts.ConnHostPolicy, needState, seen); conn != nil {
324+
return conn
324325
}
325326
}
326-
default:
327-
panic("unknown policy")
328327
}
329-
if conn == nil {
330-
c.ForceReloading()
331-
return nil, c.err(ErrNoAliveConnection).WithProperty(redis.EKSlot, slot).WithProperty(EKPolicy, policy)
328+
329+
return nil
330+
}
331+
332+
func (*Cluster) getHealthWeight(weights []uint32, health uint32) uint32 {
333+
healthWeight := uint32(0)
334+
for i, w := range weights {
335+
if health&(1<<uint(i)) == 0 {
336+
continue
337+
}
338+
healthWeight += w
332339
}
333-
return conn, nil
340+
return healthWeight
341+
}
342+
343+
func (c *Cluster) weightsForPolicySlaves(policy ReplicaPolicyEnum, shard *shard) []uint32 {
344+
var ws []uint32
345+
if atomic.LoadUint32(&c.latencyAwareness) == disabled {
346+
ws = rr[:]
347+
if policy == PreferSlaves {
348+
ws = rs[:]
349+
}
350+
} else {
351+
ws = make([]uint32, len(shard.pingWeights))
352+
for i := range shard.pingWeights {
353+
ws[i] = atomic.LoadUint32(&shard.pingWeights[i])
354+
}
355+
}
356+
return ws[:len(shard.pingWeights)]
334357
}
335358

336359
func (c *Cluster) connForAddress(addr string) *redisconn.Connection {

rediscluster/slotrange.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -113,13 +113,13 @@ func (c *Cluster) updateMappings(slotRanges []redisclusterutil.SlotsRange) {
113113
newConfig.shards[shardno] = oldshard
114114
} else {
115115
shard := &shard{
116-
addr: addrs,
117-
good: (uint32(1) << uint(len(addrs))) - 1,
118-
weights: make([]uint32, len(addrs)),
116+
addr: addrs,
117+
good: (uint32(1) << uint(len(addrs))) - 1,
118+
pingWeights: make([]uint32, len(addrs)),
119119
}
120120
newConfig.shards[shardno] = shard
121-
for i := range shard.weights {
122-
shard.weights[i] = 1
121+
for i := range shard.pingWeights {
122+
shard.pingWeights[i] = 1
123123
}
124124
}
125125
newConfig.masters[addrs[0]] = shardno
@@ -180,7 +180,7 @@ func (c *Cluster) updateMappings(slotRanges []redisclusterutil.SlotsRange) {
180180
for i, addr := range shard.addr {
181181
node := newConfig.nodes[addr]
182182
weight := sumLatency / atomic.LoadUint32(&node.ping)
183-
atomic.StoreUint32(&shard.weights[i], weight)
183+
atomic.StoreUint32(&shard.pingWeights[i], weight)
184184
}
185185
}
186186
})

0 commit comments

Comments
 (0)