Skip to content

Commit bce814a

Browse files
author
Artem Klyukvin
authored
add possibility to set replica weights explicitly (#28)
1 parent 6c9b171 commit bce814a

2 files changed

Lines changed: 30 additions & 1 deletion

File tree

rediscluster/cluster.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,14 @@ const (
6464
enabled = 1
6565
)
6666

67+
// WeightProvider explicitly provides weights for redis replicas
68+
type WeightProvider interface {
69+
// GetWeightByHost provides weight by given name. If implementation does not have weight for a given host
70+
// it must return `false` as the second return value. In this case scheduler will fallback to default
71+
// scheduling strategy (either constant weights random or ping latency based random)
72+
GetWeightByHost(host string) (uint32, bool)
73+
}
74+
6775
// Opts holds the options for Cluster
6876
type Opts struct {
6977
// HostOpts - per host options
@@ -98,8 +106,10 @@ type Opts struct {
98106

99107
// RoundRobinSeed - used to choose between master and replica.
100108
RoundRobinSeed RoundRobinSeed
101-
// LatencyOrientedRR - when MasterAndSlaves is used, prefer hosts with lower latency
109+
// LatencyOrientedRR - when MasterAndSlaves is used, prefer hosts with lower latency (has lower priority than WeightProvider)
102110
LatencyOrientedRR bool
111+
// WeightProvider - enables to explicitly set weights of replicas (has higher priority than LatencyOrientedRR)
112+
WeightProvider WeightProvider
103113
// Enable connection with TLS
104114
TLSEnabled bool
105115
// Config for TLS connection

rediscluster/mapping.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,25 @@ func (*Cluster) getHealthWeight(weights []uint32, health uint32) uint32 {
341341
}
342342

343343
func (c *Cluster) weightsForPolicySlaves(policy ReplicaPolicyEnum, shard *shard) []uint32 {
344+
if c.opts.WeightProvider == nil {
345+
return c.weightsForPolicySlavesDefault(policy, shard)
346+
}
347+
348+
weights := make([]uint32, 0, len(shard.addr))
349+
for _, addr := range shard.addr {
350+
weight, found := c.opts.WeightProvider.GetWeightByHost(addr)
351+
if !found {
352+
// there was some reconfiguration, so we fallback to default weights
353+
return c.weightsForPolicySlavesDefault(policy, shard)
354+
}
355+
356+
weights = append(weights, weight)
357+
}
358+
359+
return weights
360+
}
361+
362+
func (c *Cluster) weightsForPolicySlavesDefault(policy ReplicaPolicyEnum, shard *shard) []uint32 {
344363
var ws []uint32
345364
if atomic.LoadUint32(&c.latencyAwareness) == disabled {
346365
ws = rr[:]

0 commit comments

Comments
 (0)