Skip to content

Commit f1f1a15

Browse files
Fabian Hollerfho
authored andcommitted
retry after pause when UpdateState returns an error
Since grpc-go 1.38 the UpdateState() call of the ClientConn can return an error. When UpdateState() returns an error the grpcconsulresolver now polls consul and retries calling UpdateState() again after a backoff pause. This is analogue to how the dns resolver that is part of the grpc-go package handles UpdateState() errors. Instead of using a timer to pause before polling consul again, the WaitTime for the consul query is set to the amount of the backoff timeout. This has the advantage that if the address is consul is updated, the wait is interrupted and UpdateState() is called again with the new addresses. The backoff implementation was restored from the commit 87eb606.
1 parent 1188d19 commit f1f1a15

6 files changed

Lines changed: 198 additions & 8 deletions

File tree

consul/backoff.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package consul
2+
3+
import (
4+
"math/rand"
5+
"time"
6+
)
7+
8+
type backoff struct {
9+
intervals []time.Duration
10+
jitterPct int
11+
jitterSrc *rand.Rand
12+
}
13+
14+
var defBackoff = backoff{
15+
intervals: []time.Duration{
16+
500 * time.Millisecond,
17+
2 * time.Second,
18+
5 * time.Second,
19+
10 * time.Second,
20+
30 * time.Second,
21+
1 * time.Minute,
22+
},
23+
24+
jitterPct: 20,
25+
jitterSrc: rand.New(rand.NewSource(time.Now().UnixNano())),
26+
}
27+
28+
func (b *backoff) Backoff(retry int) time.Duration {
29+
idx := retry
30+
31+
if idx < 0 || idx > len(b.intervals)-1 {
32+
idx = len(b.intervals) - 1
33+
}
34+
35+
d := b.intervals[idx]
36+
37+
return d + time.Duration(((int64(d) / 100) * int64((b.jitterSrc.Intn(b.jitterPct)))))
38+
}

consul/backoff_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package consul
2+
3+
import (
4+
"testing"
5+
"time"
6+
)
7+
8+
func maxBackoffJitter(idx int) time.Duration {
9+
return time.Duration(int64(defBackoff.intervals[idx]) / 100 * int64(defBackoff.jitterPct))
10+
}
11+
12+
func TestBackoff(t *testing.T) {
13+
for i := range defBackoff.intervals {
14+
d := defBackoff.Backoff(i)
15+
if d < defBackoff.intervals[i] {
16+
t.Errorf("duration for retry %d is %d expected >= %d", i, d, defBackoff.intervals[i])
17+
}
18+
19+
maxJitter := maxBackoffJitter(i)
20+
if d >= d+maxJitter {
21+
t.Errorf("duration for retry %d is %d expected <= %d", i, d, d+maxJitter)
22+
}
23+
}
24+
25+
}
26+
27+
func TestBackoff_IntervalIdxBounds(t *testing.T) {
28+
tests := []struct {
29+
name string
30+
retry int
31+
idx int
32+
}{
33+
{
34+
name: "retry: -1",
35+
retry: -1,
36+
idx: 0,
37+
},
38+
{
39+
name: "retry > interval len",
40+
retry: len(defBackoff.intervals),
41+
idx: len(defBackoff.intervals) - 1,
42+
},
43+
}
44+
for _, tt := range tests {
45+
t.Run(tt.name, func(t *testing.T) {
46+
retry := tt.retry
47+
48+
d := defBackoff.Backoff(retry)
49+
if d < defBackoff.intervals[tt.idx] {
50+
t.Errorf("duration for retry %d is %d expected >= %d", retry, d, defBackoff.intervals[tt.idx])
51+
}
52+
53+
maxJitter := maxBackoffJitter(tt.idx)
54+
if d >= d+maxJitter {
55+
t.Errorf("duration for retry %d is %d expected <= %d", retry, d, d+maxJitter)
56+
}
57+
58+
})
59+
}
60+
}

consul/resolver.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type consulResolver struct {
3030
healthFilter healthFilter
3131
ctx context.Context
3232
cancel context.CancelFunc
33+
backoff *backoff
3334
resolveNow chan struct{}
3435
wgStop sync.WaitGroup
3536
}
@@ -73,6 +74,7 @@ func newConsulResolver(
7374
service: consulService,
7475
tags: tags,
7576
healthFilter: healthFilter,
77+
backoff: &defBackoff,
7678
ctx: ctx,
7779
cancel: cancel,
7880
resolveNow: make(chan struct{}, 1),
@@ -171,12 +173,24 @@ func (c *consulResolver) watcher() {
171173
defer c.wgStop.Done()
172174

173175
for {
176+
var retryCount int
177+
174178
for {
175179
var addrs []resolver.Address
176180
var err error
177181

178182
lastWaitIndex := opts.WaitIndex
179183

184+
if retryCount > 0 {
185+
// if UpdateState returned an error, we retry resolving
186+
// + setting UpdateState after a backoff pause.
187+
// The pause is applied by doing a consul query
188+
// and setting a timeout. This has the
189+
// advantage that the pause is also interrupted
190+
// if the entry in consul was updated.
191+
opts.WaitTime = c.backoff.Backoff(retryCount)
192+
}
193+
180194
queryStartTime := time.Now()
181195
addrs, opts.WaitIndex, err = c.query(opts)
182196
if err != nil {
@@ -206,7 +220,9 @@ func (c *consulResolver) watcher() {
206220
// addresses (addrs is nil), we have to report an empty
207221
// set of resolved addresses. It informs the grpc-balancer that resolution is not
208222
// in progress anymore and grpc calls can failFast.
209-
if addressesEqual(addrs, lastReportedAddrs) {
223+
// If the previous call to UpdateState failed, retryCount is != 0, then this
224+
// check is ignored and UpdateState is called again.
225+
if addressesEqual(addrs, lastReportedAddrs) && retryCount == 0 {
210226
// If the consul server responds with
211227
// the same data then in the last
212228
// query in less then 50ms, we sleep a
@@ -223,8 +239,15 @@ func (c *consulResolver) watcher() {
223239
continue
224240
}
225241

226-
c.cc.UpdateState(resolver.State{Addresses: addrs})
227242
lastReportedAddrs = addrs
243+
err = c.cc.UpdateState(resolver.State{Addresses: addrs})
244+
if err != nil {
245+
grpclog.Infof("grpcconsulresolver: updateState failed: %s, retrying after pause", err)
246+
retryCount++
247+
continue
248+
}
249+
250+
retryCount = 0
228251
}
229252

230253
select {

consul/resolver_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -565,3 +565,60 @@ func TestQueryResultsAreSorted(t *testing.T) {
565565

566566
r.Close()
567567
}
568+
569+
func TestConsulIsPolledOnUpdateStateErr(t *testing.T) {
570+
cc := mocks.NewClientConn()
571+
health := mocks.NewConsulHealthClient()
572+
cleanup := replaceCreateHealthClientFn(
573+
func(cfg *consul.Config) (consulHealthEndpoint, error) {
574+
return health, nil
575+
},
576+
)
577+
t.Cleanup(cleanup)
578+
579+
health.SetRespEntries([]*consul.ServiceEntry{
580+
{
581+
Service: &consul.AgentService{
582+
Address: "227.0.0.1",
583+
Port: 1,
584+
},
585+
},
586+
})
587+
588+
defBackoff.intervals = []time.Duration{
589+
1 * time.Millisecond,
590+
2 * time.Millisecond,
591+
3 * time.Millisecond,
592+
4 * time.Millisecond,
593+
5 * time.Millisecond,
594+
}
595+
596+
r, err := NewBuilder().Build(resolver.Target{Endpoint: "test"}, cc, resolver.BuildOptions{})
597+
if err != nil {
598+
t.Fatal("Build() failed:", err.Error())
599+
}
600+
601+
cc.SetUpdateStateReturnVal(errors.New("mocked updatestate err"))
602+
603+
r.ResolveNow(resolver.ResolveNowOptions{})
604+
605+
// consul should be polled and updateState queried again after exponential sleep
606+
for cc.UpdateStateCallCnt() < 3 {
607+
time.Sleep(500 * time.Microsecond)
608+
}
609+
610+
// change UpdateState to return a nil error, afterwards UpdateState should only be called 1 time again
611+
cc.SetUpdateStateReturnVal(nil)
612+
updateStateCalls := cc.UpdateStateCallCnt()
613+
614+
maxBackoff := defBackoff.intervals[len(defBackoff.intervals)-1]
615+
maxSleep := maxBackoff + maxBackoff*(time.Duration((defBackoff.jitterPct / 100))) + time.Second
616+
time.Sleep(maxSleep)
617+
618+
updateStateCallsAfterSuccess := cc.UpdateStateCallCnt()
619+
if updateStateCalls+1 > updateStateCallsAfterSuccess {
620+
t.Errorf("resolver called UpdateState %d times after resolving succeeded, expected <= %d calls",
621+
updateStateCallsAfterSuccess, updateStateCalls+1,
622+
)
623+
}
624+
}

internal/mocks/clientconn.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@ import (
99
)
1010

1111
type ClientConn struct {
12-
mutex sync.Mutex
13-
addrs []resolver.Address
14-
newAddressCallCnt int
15-
lastReportedError error
12+
mutex sync.Mutex
13+
addrs []resolver.Address
14+
newAddressCallCnt int
15+
lastReportedError error
16+
updateStateReturnVal error
1617
}
1718

1819
func NewClientConn() *ClientConn {
@@ -37,12 +38,14 @@ func (t *ClientConn) LastReportedError() error {
3738
return t.lastReportedError
3839
}
3940

40-
func (t *ClientConn) UpdateState(state resolver.State) {
41+
func (t *ClientConn) UpdateState(state resolver.State) error {
4142
t.mutex.Lock()
4243
defer t.mutex.Unlock()
4344

4445
t.addrs = state.Addresses
4546
t.newAddressCallCnt++
47+
48+
return t.updateStateReturnVal
4649
}
4750

4851
func (t *ClientConn) NewAddress(addrs []resolver.Address) {
@@ -67,5 +70,11 @@ func (t *ClientConn) Addrs() (addrs []resolver.Address) {
6770
return t.addrs
6871
}
6972

70-
func (*ClientConn) NewServiceConfig(string) {
73+
func (*ClientConn) NewServiceConfig(string) {}
74+
75+
func (t *ClientConn) SetUpdateStateReturnVal(v error) {
76+
t.mutex.Lock()
77+
defer t.mutex.Unlock()
78+
79+
t.updateStateReturnVal = v
7180
}

internal/mocks/consulhealth.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package mocks
22

33
import (
44
"sync"
5+
"time"
56

67
consul "github.com/hashicorp/consul/api"
78
)
@@ -51,5 +52,7 @@ func (c *ConsulHealthClient) ServiceMultipleTags(service string, tags []string,
5152
return nil, nil, q.Context().Err()
5253
}
5354

55+
time.Sleep(q.WaitTime)
56+
5457
return c.entries, &c.queryMeta, c.err
5558
}

0 commit comments

Comments
 (0)