Skip to content

Commit dda1344

Browse files
author
Artem Klyukvin
authored
force min ping replica under option (#29)
1 parent bce814a commit dda1344

2 files changed

Lines changed: 35 additions & 3 deletions

File tree

rediscluster/cluster.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ type Opts struct {
108108
RoundRobinSeed RoundRobinSeed
109109
// LatencyOrientedRR - when MasterAndSlaves is used, prefer hosts with lower latency (has lower priority than WeightProvider)
110110
LatencyOrientedRR bool
111+
// ForceMinLatencyReplica - when LatencyOrientedRR is used forces min latency replica instead of using `weight[i] = sum(ping_latency[i]) / ping_latency[i]` algorithm
112+
ForceMinLatencyReplica bool
111113
// WeightProvider - enables to explicitly set weights of replicas (has higher priority than LatencyOrientedRR)
112114
WeightProvider WeightProvider
113115
// Enable connection with TLS
@@ -132,7 +134,8 @@ type Cluster struct {
132134

133135
opts Opts
134136

135-
latencyAwareness uint32
137+
latencyAwareness uint32
138+
forceMinLatencyReplica uint32
136139

137140
m sync.Mutex
138141

@@ -242,6 +245,10 @@ func NewCluster(ctx context.Context, initAddrs []string, opts Opts) (*Cluster, e
242245
if cluster.opts.LatencyOrientedRR {
243246
cluster.latencyAwareness = enabled
244247
}
248+
cluster.forceMinLatencyReplica = disabled
249+
if cluster.opts.ForceMinLatencyReplica {
250+
cluster.forceMinLatencyReplica = enabled
251+
}
245252
cluster.opts.HostOpts.TLSEnabled = opts.TLSEnabled
246253
cluster.opts.HostOpts.TLSConfig = opts.TLSConfig
247254

@@ -311,6 +318,15 @@ func (c *Cluster) SetLatencyOrientedRR(v bool) {
311318
}
312319
}
313320

321+
// SetForceMinLatencyReplica changes "min latency replica forcing" on the fly
322+
func (c *Cluster) SetForceMinLatencyReplica(v bool) {
323+
if v {
324+
atomic.StoreUint32(&c.forceMinLatencyReplica, enabled)
325+
} else {
326+
atomic.StoreUint32(&c.forceMinLatencyReplica, disabled)
327+
}
328+
}
329+
314330
func (c *Cluster) control() {
315331
t := time.NewTicker(c.opts.CheckInterval)
316332
defer t.Stop()

rediscluster/slotrange.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package rediscluster
22

33
import (
44
"bytes"
5+
"math"
56
"sync/atomic"
67
"time"
78

@@ -173,13 +174,28 @@ func (c *Cluster) updateMappings(slotRanges []redisclusterutil.SlotsRange) {
173174
}
174175
for _, shard := range newConfig.shards {
175176
sumLatency := uint32(0)
176-
for _, addr := range shard.addr {
177+
minLatencyID := 0
178+
minLatency := uint32(math.MaxUint32)
179+
180+
for i, addr := range shard.addr {
177181
node := newConfig.nodes[addr]
178-
sumLatency += atomic.LoadUint32(&node.ping)
182+
pingLatency := atomic.LoadUint32(&node.ping)
183+
if pingLatency < minLatency {
184+
minLatency = pingLatency
185+
minLatencyID = i
186+
}
187+
188+
sumLatency += pingLatency
179189
}
180190
for i, addr := range shard.addr {
181191
node := newConfig.nodes[addr]
192+
182193
weight := sumLatency / atomic.LoadUint32(&node.ping)
194+
if atomic.LoadUint32(&c.forceMinLatencyReplica) == enabled && i == minLatencyID {
195+
const alwaysPrefer = 1_000_000
196+
weight = alwaysPrefer
197+
}
198+
183199
atomic.StoreUint32(&shard.pingWeights[i], weight)
184200
}
185201
}

0 commit comments

Comments
 (0)