Skip to content

Commit 2496e8a

Browse files
committed
Fix terminator bug
1 parent cb55912 commit 2496e8a

2 files changed

Lines changed: 78 additions & 25 deletions

File tree

append_lifecycle.go

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -503,8 +503,9 @@ type terminator struct {
503503
mu sync.RWMutex
504504
stopped bool
505505

506-
// largestIssued tracks the largest index allocated by this appender.
507-
largestIssued atomic.Uint64
506+
// wantTreeSize tracks a tree size which is large enough to cover all
507+
// locally added entries.
508+
wantTreeSize atomic.Uint64
508509

509510
// shutdownTimeout is the maximum duration to wait for the shutdown process to complete.
510511
shutdownTimeout time.Duration
@@ -525,12 +526,13 @@ func (t *terminator) Add(ctx context.Context, entry *Entry) IndexFuture {
525526
return i, err
526527
}
527528

528-
// https://github.com/golang/go/issues/63999 - atomically set largest issued index
529-
old := t.largestIssued.Load()
530-
for old < i.Index && !t.largestIssued.CompareAndSwap(old, i.Index) {
531-
old = t.largestIssued.Load()
529+
// https://github.com/golang/go/issues/63999 - atomically set minimum tree size for shutdown.
530+
wantSize := i.Index + 1
531+
old := t.wantTreeSize.Load()
532+
for old < wantSize && !t.wantTreeSize.CompareAndSwap(old, wantSize) {
533+
old = t.wantTreeSize.Load()
532534
}
533-
appenderHighestIndex.Record(ctx, otel.Clamp64(t.largestIssued.Load()))
535+
appenderHighestIndex.Record(ctx, otel.Clamp64(t.wantTreeSize.Load()-1))
534536

535537
return i, err
536538
}
@@ -546,8 +548,9 @@ func (t *terminator) Shutdown(ctx context.Context) error {
546548
t.mu.Lock()
547549
defer t.mu.Unlock()
548550
t.stopped = true
549-
maxIndex := t.largestIssued.Load()
550-
if maxIndex == 0 {
551+
552+
wantSize := t.wantTreeSize.Load()
553+
if wantSize == 0 {
551554
// special case no work done
552555
return nil
553556
}
@@ -580,8 +583,8 @@ func (t *terminator) Shutdown(ctx context.Context) error {
580583
if err != nil {
581584
return err
582585
}
583-
slog.DebugContext(gCtx, "Shutting down, waiting for checkpoint", slog.Uint64("goal", maxIndex), slog.Uint64("current", size))
584-
if size > maxIndex {
586+
slog.DebugContext(gCtx, "Shutting down, waiting for checkpoint", slog.Uint64("goal", wantSize), slog.Uint64("current", size))
587+
if size >= wantSize {
585588
return nil
586589
}
587590
}
@@ -604,8 +607,8 @@ func (t *terminator) Shutdown(ctx context.Context) error {
604607
if err != nil {
605608
return err
606609
}
607-
slog.DebugContext(gCtx, "Shutting down, waiting for follower", slog.String("follower", f.Name()), slog.Uint64("goal", maxIndex), slog.Uint64("processed", processed))
608-
if processed > maxIndex {
610+
slog.DebugContext(gCtx, "Shutting down, waiting for follower", slog.String("follower", f.Name()), slog.Uint64("goal", wantSize), slog.Uint64("processed", processed))
611+
if processed >= wantSize {
609612
return nil
610613
}
611614
}

append_lifecycle_test.go

Lines changed: 62 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ package tessera
1616

1717
import (
1818
"context"
19+
"errors"
20+
"fmt"
1921
"strings"
2022
"testing"
2123
"time"
@@ -150,22 +152,70 @@ func mustCreateSigner(t *testing.T, k string) note.Signer {
150152
return s
151153
}
152154

153-
func TestShutdownTimeout(t *testing.T) {
155+
func TestShutdownBehavior(t *testing.T) {
156+
tests := []struct {
157+
name string
158+
wantTreeSize uint64
159+
cpSize uint64
160+
expectWait bool
161+
}{
162+
{
163+
name: "no work done",
164+
wantTreeSize: 0,
165+
expectWait: false,
166+
},
167+
{
168+
name: "wait for index 0",
169+
wantTreeSize: 1,
170+
cpSize: 0,
171+
expectWait: true,
172+
},
173+
{
174+
name: "already caught up",
175+
wantTreeSize: 1,
176+
cpSize: 1,
177+
expectWait: false,
178+
},
179+
}
180+
181+
for _, test := range tests {
182+
t.Run(test.name, func(t *testing.T) {
183+
term := &terminator{
184+
readCheckpoint: func(ctx context.Context) ([]byte, error) {
185+
// Return a valid checkpoint string that parse.CheckpointUnsafe can parse.
186+
return []byte(fmt.Sprintf("example.com\n%d\nqINS1GRFhWHwdkUeqLEoP4yEMkTBBzxBkGwGQlVlVcs=\n", test.cpSize)), nil
187+
},
188+
shutdownTimeout: 10 * time.Millisecond,
189+
}
190+
term.wantTreeSize.Store(test.wantTreeSize)
191+
192+
// If we've added an entry, then the terminator should wait for a checkpoint covering it.
193+
// Since we don't provide any checkpoints, we can detect this by waiting for it to timeout.
194+
err := term.Shutdown(t.Context())
195+
if gotTimeout := errors.Is(err, context.DeadlineExceeded); gotTimeout != test.expectWait {
196+
t.Fatalf("Expected timeout error from waiting for checkpoint to catch up: %v, got timeout: %v, err: %v", test.expectWait, gotTimeout, err)
197+
}
198+
})
199+
}
200+
}
201+
202+
func TestAddUpdatesWantTreeSize(t *testing.T) {
203+
wantIdx := uint64(5)
154204
term := &terminator{
155-
readCheckpoint: func(ctx context.Context) ([]byte, error) {
156-
// simulate a call that doesn't finish, just block or wait
157-
<-ctx.Done()
158-
return nil, ctx.Err()
205+
delegate: func(_ context.Context, _ *Entry) IndexFuture {
206+
return func() (Index, error) {
207+
return Index{Index: wantIdx}, nil
208+
}
159209
},
160210
}
161-
term.largestIssued.Store(1)
162-
term.shutdownTimeout = 10 * time.Millisecond
163211

164-
err := term.Shutdown(context.Background())
165-
if err == nil {
166-
t.Fatal("Expected timeout error, got nil")
212+
f := term.Add(t.Context(), nil)
213+
if _, err := f(); err != nil {
214+
t.Fatal(err)
167215
}
168-
if !strings.Contains(err.Error(), "deadline exceeded") && !strings.Contains(err.Error(), "context canceled") {
169-
t.Fatalf("Expected context deadline exceeded or canceled error, got: %v", err)
216+
217+
if got := term.wantTreeSize.Load(); got != wantIdx+1 {
218+
t.Fatalf("wantTreeSize should be %d after adding index %d, got %d", wantIdx+1, wantIdx, got)
170219
}
171220
}
221+

0 commit comments

Comments
 (0)