Skip to content

Commit 7d6bf28

Browse files
author
wencheng
committed
fix(discov): revalidate lease after watch re-put
1 parent b14d290 commit 7d6bf28

2 files changed

Lines changed: 181 additions & 67 deletions

File tree

core/discov/publisher.go

Lines changed: 55 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -152,18 +152,11 @@ func (p *Publisher) keepAliveAsync(cli internal.EtcdClient) error {
152152
logc.Infof(cli.Ctx(), "etcd publisher watch: %s, event: %v",
153153
evt.Kv.Key, evt.Type)
154154

155-
// Make sure the lease is still valid before re-putting the key.
156-
// Otherwise the Put may happen with an already-expired or zero
157-
// LeaseID (e.g. when the DELETE event is caused by lease expiry
158-
// and races with KeepAlive channel close), which makes the key
159-
// permanent in etcd (no TTL) and leaks forever.
160-
ttlResp, ttlErr := cli.TimeToLive(cli.Ctx(), p.lease)
161-
if ttlErr != nil || ttlResp == nil || ttlResp.TTL <= 0 {
162-
logc.Errorf(cli.Ctx(),
163-
"etcd publisher lease expired, skip re-put and restart keepalive: leaseID=%d, err=%v",
164-
p.lease, ttlErr)
165-
p.revoke(cli)
166-
if err := p.doKeepAlive(); err != nil {
155+
// Keep the fast path for manually deleted keys, but revalidate both
156+
// before and after the re-put so an already-expired lease can't leave
157+
// a permanent orphaned key behind.
158+
if !p.leaseAlive(cli) {
159+
if err := p.restartKeepAlive(cli); err != nil {
167160
logc.Errorf(cli.Ctx(), "etcd publisher KeepAlive: %v", err)
168161
}
169162
return
@@ -172,10 +165,18 @@ func (p *Publisher) keepAliveAsync(cli internal.EtcdClient) error {
172165
_, err := cli.Put(cli.Ctx(), p.fullKey, p.value, clientv3.WithLease(p.lease))
173166
if err != nil {
174167
logc.Errorf(cli.Ctx(), "etcd publisher re-put key: %v", err)
175-
} else {
176-
logc.Infof(cli.Ctx(), "etcd publisher re-put key: %s, value: %s",
177-
p.fullKey, p.value)
168+
continue
178169
}
170+
171+
if !p.keyBoundToLease(cli) {
172+
if err := p.restartKeepAlive(cli); err != nil {
173+
logc.Errorf(cli.Ctx(), "etcd publisher KeepAlive: %v", err)
174+
}
175+
return
176+
}
177+
178+
logc.Infof(cli.Ctx(), "etcd publisher re-put key: %s, value: %s",
179+
p.fullKey, p.value)
179180
}
180181
}
181182
case <-p.pauseChan:
@@ -223,6 +224,45 @@ func (p *Publisher) revoke(cli internal.EtcdClient) {
223224
}
224225
}
225226

227+
func (p *Publisher) keyBoundToLease(cli internal.EtcdClient) bool {
228+
resp, err := cli.Get(cli.Ctx(), p.fullKey)
229+
if err != nil {
230+
logc.Errorf(cli.Ctx(), "etcd publisher verify re-put lease: %v", err)
231+
return false
232+
}
233+
234+
if len(resp.Kvs) == 0 {
235+
logc.Errorf(cli.Ctx(), "etcd publisher verify re-put lease: key missing after put, key=%s", p.fullKey)
236+
return false
237+
}
238+
239+
if resp.Kvs[0].Lease != int64(p.lease) {
240+
logc.Errorf(cli.Ctx(),
241+
"etcd publisher verify re-put lease: unexpected lease, key=%s, want=%d, got=%d",
242+
p.fullKey, p.lease, resp.Kvs[0].Lease)
243+
return false
244+
}
245+
246+
return true
247+
}
248+
249+
func (p *Publisher) leaseAlive(cli internal.EtcdClient) bool {
250+
resp, err := cli.TimeToLive(cli.Ctx(), p.lease)
251+
if err != nil || resp == nil || resp.TTL <= 0 {
252+
logc.Errorf(cli.Ctx(),
253+
"etcd publisher lease expired, skip re-put and restart keepalive: leaseID=%d, err=%v",
254+
p.lease, err)
255+
return false
256+
}
257+
258+
return true
259+
}
260+
261+
func (p *Publisher) restartKeepAlive(cli internal.EtcdClient) error {
262+
p.revoke(cli)
263+
return p.doKeepAlive()
264+
}
265+
226266
// WithId customizes a Publisher with the id.
227267
func WithId(id int64) PubOption {
228268
return func(publisher *Publisher) {

core/discov/publisher_test.go

Lines changed: 126 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -252,8 +252,7 @@ func TestPublisher_keepAliveAsyncPause(t *testing.T) {
252252
wg.Wait()
253253
}
254254

255-
// Test case for key deletion and re-registration (covers lines 148-155)
256-
func TestPublisher_keepAliveAsyncKeyDeletion(t *testing.T) {
255+
func TestPublisher_keepAliveAsyncDeleteRePutsWhenLeaseAlive(t *testing.T) {
257256
ctrl := gomock.NewController(t)
258257
defer ctrl.Finish()
259258
const id clientv3.LeaseID = 1
@@ -262,33 +261,24 @@ func TestPublisher_keepAliveAsyncKeyDeletion(t *testing.T) {
262261
defer restore()
263262
cli.EXPECT().Ctx().AnyTimes()
264263
cli.EXPECT().KeepAlive(gomock.Any(), id)
265-
266-
// Create a watch channel that will send a delete event
267-
watchChan := make(chan clientv3.WatchResponse, 1)
268-
watchResp := clientv3.WatchResponse{
269-
Events: []*clientv3.Event{{
270-
Type: clientv3.EventTypeDelete,
271-
Kv: &mvccpb.KeyValue{
272-
Key: []byte("thekey"),
273-
},
274-
}},
275-
}
276-
watchChan <- watchResp
277-
278-
cli.EXPECT().Watch(gomock.Any(), gomock.Any(), gomock.Any()).Return((<-chan clientv3.WatchResponse)(watchChan))
264+
cli.EXPECT().Watch(gomock.Any(), gomock.Any(), gomock.Any()).Return(newDeleteWatchChan("thekey"))
265+
cli.EXPECT().TimeToLive(gomock.Any(), id).Return(&clientv3.LeaseTimeToLiveResponse{
266+
TTL: 1,
267+
}, nil)
279268

280269
var wg sync.WaitGroup
281-
wg.Add(1) // Only wait for Revoke call
282-
283-
// Use a channel to signal when Put has been called
270+
wg.Add(1)
284271
putCalled := make(chan struct{})
285-
286-
// Expect the re-put operation when key is deleted
287272
cli.EXPECT().Put(gomock.Any(), "thekey", "thevalue", gomock.Any()).Do(func(_, _, _, _ any) {
288-
close(putCalled) // Signal that Put has been called
273+
close(putCalled)
289274
}).Return(nil, nil)
290-
291-
// Expect revoke when Stop is called
275+
cli.EXPECT().Get(gomock.Any(), "thekey").Return(&clientv3.GetResponse{
276+
Kvs: []*mvccpb.KeyValue{{
277+
Key: []byte("thekey"),
278+
Value: []byte("thevalue"),
279+
Lease: int64(id),
280+
}},
281+
}, nil)
292282
cli.EXPECT().Revoke(gomock.Any(), id).Do(func(_, _ any) {
293283
wg.Done()
294284
})
@@ -305,8 +295,7 @@ func TestPublisher_keepAliveAsyncKeyDeletion(t *testing.T) {
305295
wg.Wait()
306296
}
307297

308-
// Test case for key deletion with re-put error (covers error branch in lines 151-152)
309-
func TestPublisher_keepAliveAsyncKeyDeletionPutError(t *testing.T) {
298+
func TestPublisher_keepAliveAsyncDeleteSkipsPutAndRestartsWhenLeaseExpired(t *testing.T) {
310299
ctrl := gomock.NewController(t)
311300
defer ctrl.Finish()
312301
const id clientv3.LeaseID = 1
@@ -315,47 +304,108 @@ func TestPublisher_keepAliveAsyncKeyDeletionPutError(t *testing.T) {
315304
defer restore()
316305
cli.EXPECT().Ctx().AnyTimes()
317306
cli.EXPECT().KeepAlive(gomock.Any(), id)
307+
cli.EXPECT().Watch(gomock.Any(), gomock.Any(), gomock.Any()).Return(newDeleteWatchChan("thekey"))
308+
cli.EXPECT().TimeToLive(gomock.Any(), id).Return(&clientv3.LeaseTimeToLiveResponse{
309+
TTL: 0,
310+
}, nil)
318311

319-
// Create a watch channel that will send a delete event
320-
watchChan := make(chan clientv3.WatchResponse, 1)
321-
watchResp := clientv3.WatchResponse{
322-
Events: []*clientv3.Event{{
323-
Type: clientv3.EventTypeDelete,
324-
Kv: &mvccpb.KeyValue{
325-
Key: []byte("thekey"),
326-
},
327-
}},
328-
}
329-
watchChan <- watchResp
312+
restarted := make(chan struct{})
313+
cli.EXPECT().Revoke(gomock.Any(), id).Do(func(_, _ any) {
314+
close(restarted)
315+
})
330316

331-
cli.EXPECT().Watch(gomock.Any(), gomock.Any(), gomock.Any()).Return((<-chan clientv3.WatchResponse)(watchChan))
317+
pub := NewPublisher(nil, "thekey", "thevalue")
318+
pub.lease = id
319+
pub.fullKey = "thekey"
332320

333-
var wg sync.WaitGroup
334-
wg.Add(1) // Only wait for Revoke call
321+
assert.Nil(t, pub.keepAliveAsync(cli))
322+
waitForSignal(t, restarted)
323+
pub.Stop()
324+
}
335325

336-
// Use a channel to signal when Put has been called
337-
putCalled := make(chan struct{})
326+
func TestPublisher_keepAliveAsyncDeleteSkipsPutWhenTTLUnavailable(t *testing.T) {
327+
tests := []struct {
328+
name string
329+
resp *clientv3.LeaseTimeToLiveResponse
330+
err error
331+
leaseID clientv3.LeaseID
332+
}{
333+
{
334+
name: "ttl error",
335+
err: errors.New("ttl error"),
336+
leaseID: 1,
337+
},
338+
{
339+
name: "ttl nil response",
340+
resp: nil,
341+
leaseID: 2,
342+
},
343+
}
338344

339-
// Expect the re-put operation to fail
340-
cli.EXPECT().Put(gomock.Any(), "thekey", "thevalue", gomock.Any()).Do(func(_, _, _, _ any) {
341-
close(putCalled) // Signal that Put has been called
342-
}).Return(nil, errors.New("put error"))
345+
for _, tt := range tests {
346+
t.Run(tt.name, func(t *testing.T) {
347+
ctrl := gomock.NewController(t)
348+
defer ctrl.Finish()
349+
350+
cli := internal.NewMockEtcdClient(ctrl)
351+
restore := setMockClient(cli)
352+
defer restore()
353+
354+
cli.EXPECT().Ctx().AnyTimes()
355+
cli.EXPECT().KeepAlive(gomock.Any(), tt.leaseID)
356+
cli.EXPECT().Watch(gomock.Any(), gomock.Any(), gomock.Any()).Return(newDeleteWatchChan("thekey"))
357+
cli.EXPECT().TimeToLive(gomock.Any(), tt.leaseID).Return(tt.resp, tt.err)
358+
359+
restarted := make(chan struct{})
360+
cli.EXPECT().Revoke(gomock.Any(), tt.leaseID).Do(func(_, _ any) {
361+
close(restarted)
362+
})
363+
364+
pub := NewPublisher(nil, "thekey", "thevalue")
365+
pub.lease = tt.leaseID
366+
pub.fullKey = "thekey"
367+
368+
assert.Nil(t, pub.keepAliveAsync(cli))
369+
waitForSignal(t, restarted)
370+
pub.Stop()
371+
})
372+
}
373+
}
374+
375+
func TestPublisher_keepAliveAsyncDeleteRestartsWhenRePutLosesLease(t *testing.T) {
376+
ctrl := gomock.NewController(t)
377+
defer ctrl.Finish()
378+
const id clientv3.LeaseID = 1
379+
cli := internal.NewMockEtcdClient(ctrl)
380+
restore := setMockClient(cli)
381+
defer restore()
382+
cli.EXPECT().Ctx().AnyTimes()
383+
cli.EXPECT().KeepAlive(gomock.Any(), id)
384+
cli.EXPECT().Watch(gomock.Any(), gomock.Any(), gomock.Any()).Return(newDeleteWatchChan("thekey"))
385+
cli.EXPECT().TimeToLive(gomock.Any(), id).Return(&clientv3.LeaseTimeToLiveResponse{
386+
TTL: 1,
387+
}, nil)
388+
cli.EXPECT().Put(gomock.Any(), "thekey", "thevalue", gomock.Any()).Return(nil, nil)
389+
cli.EXPECT().Get(gomock.Any(), "thekey").Return(&clientv3.GetResponse{
390+
Kvs: []*mvccpb.KeyValue{{
391+
Key: []byte("thekey"),
392+
Value: []byte("thevalue"),
393+
Lease: 0,
394+
}},
395+
}, nil)
343396

344-
// Expect revoke when Stop is called
397+
restarted := make(chan struct{})
345398
cli.EXPECT().Revoke(gomock.Any(), id).Do(func(_, _ any) {
346-
wg.Done()
399+
close(restarted)
347400
})
348401

349402
pub := NewPublisher(nil, "thekey", "thevalue")
350403
pub.lease = id
351404
pub.fullKey = "thekey"
352405

353406
assert.Nil(t, pub.keepAliveAsync(cli))
354-
355-
// Wait for Put to be called, then stop
356-
<-putCalled
407+
waitForSignal(t, restarted)
357408
pub.Stop()
358-
wg.Wait()
359409
}
360410

361411
func TestPublisher_Resume(t *testing.T) {
@@ -462,3 +512,27 @@ func createTempFile(t *testing.T, body []byte) string {
462512

463513
return tmpFile.Name()
464514
}
515+
516+
func newDeleteWatchChan(key string) <-chan clientv3.WatchResponse {
517+
watchChan := make(chan clientv3.WatchResponse, 1)
518+
watchChan <- clientv3.WatchResponse{
519+
Events: []*clientv3.Event{{
520+
Type: clientv3.EventTypeDelete,
521+
Kv: &mvccpb.KeyValue{
522+
Key: []byte(key),
523+
},
524+
}},
525+
}
526+
527+
return watchChan
528+
}
529+
530+
func waitForSignal(t *testing.T, ch <-chan struct{}) {
531+
t.Helper()
532+
533+
select {
534+
case <-ch:
535+
case <-time.After(time.Second):
536+
t.Fatal("timed out waiting for signal")
537+
}
538+
}

0 commit comments

Comments
 (0)