Skip to content

Commit 72b3ec8

Browse files
authored
Memoize the outer-most AddFn (#711)
1 parent 2b8208b commit 72b3ec8

3 files changed

Lines changed: 84 additions & 8 deletions

File tree

append_lifecycle.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,11 +258,28 @@ func NewAppender(ctx context.Context, d Driver, opts *AppendOptions) (*Appender,
258258
ctx, span := tracer.Start(ctx, "tessera.Appender.Add")
259259
defer span.End()
260260

261-
return t.Add(ctx, entry)
261+
// NOTE: We memoize the returned value here so that repeated calls to the returned
262+
// future don't result in unexpected side-effects from inner AddFn functions
263+
// being called multiple times.
264+
// Currently this is the outermost wrapping of Add so we do the memoization
265+
// here, if this changes, ensure that we move the memoization call so that
266+
// this remains true.
267+
return memoizeFuture(t.Add(ctx, entry))
262268
}
263269
return a, t.Shutdown, r, nil
264270
}
265271

272+
// memoizeFuture wraps an AddFn delegate with logic to ensure that the delegate is called at most
273+
// once.
274+
func memoizeFuture(delegate IndexFuture) IndexFuture {
275+
f := sync.OnceValues(func() (Index, error) {
276+
return delegate()
277+
})
278+
return func() (Index, error) {
279+
return f()
280+
}
281+
}
282+
266283
func followerStats(ctx context.Context, f Follower, size func(context.Context) (uint64, error)) {
267284
name := f.Name()
268285
t := time.NewTicker(200 * time.Millisecond)

append_lifecycle_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Copyright 2025 The Tessera authors. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package tessera
16+
17+
import (
18+
"context"
19+
"testing"
20+
)
21+
22+
func TestMemoize(t *testing.T) {
23+
// Set up an AddFn which will increment a counter every time it's called, and return that in the Index.
24+
i := uint64(0)
25+
deleg := func() (Index, error) {
26+
i++
27+
return Index{
28+
Index: i,
29+
}, nil
30+
}
31+
add := func(_ context.Context, _ *Entry) IndexFuture {
32+
return deleg
33+
}
34+
35+
// Create a single future (for a single Entry), and convince ourselves that the counter is being incremented
36+
// each time the future is being invoked.
37+
f1 := add(nil, nil)
38+
a, _ := f1()
39+
b, _ := f1()
40+
if a.Index == b.Index {
41+
t.Fatalf("a(=%d) == b(=%d)", a.Index, b.Index)
42+
}
43+
44+
// Now create an AddFn which memoizes the result of the delegate, like we do in NewAppender, and assert that
45+
// repeated calls to the future work as expected; only incrementing the counter once.
46+
add = func(_ context.Context, _ *Entry) IndexFuture {
47+
return memoizeFuture(deleg)
48+
}
49+
f2 := add(nil, nil)
50+
c, _ := f2()
51+
d, _ := f2()
52+
53+
if c.Index != d.Index {
54+
t.Fatalf("c(=%d) != d(=%d)", c.Index, d.Index)
55+
}
56+
}

client/stream_test.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,10 @@ func TestEntryBundles(t *testing.T) {
4242
}
4343
}()
4444

45-
if _, err := populateEntries(ctx, tl, logSize1, "first"); err != nil {
45+
if _, err := populateEntries(t, tl, logSize1, "first"); err != nil {
4646
t.Fatalf("populateEntries(first): %v", err)
4747
}
48-
if _, err := populateEntries(ctx, tl, logSize2-logSize1, "second"); err != nil {
48+
if _, err := populateEntries(t, tl, logSize2-logSize1, "second"); err != nil {
4949
t.Fatalf("populateEntries(second): %v", err)
5050
}
5151

@@ -95,7 +95,7 @@ func TestEntries(t *testing.T) {
9595
}()
9696

9797
// Put some entries into a log.
98-
es, err := populateEntries(ctx, tl, logSize, "first")
98+
es, err := populateEntries(t, tl, logSize, "first")
9999
if err != nil {
100100
t.Fatalf("populateEntries(): %v", err)
101101
}
@@ -140,18 +140,21 @@ func TestEntries(t *testing.T) {
140140
}
141141
}
142142

143-
func populateEntries(ctx context.Context, tl *testonly.TestLog, N uint64, ep string) ([][]byte, error) {
143+
func populateEntries(t *testing.T, tl *testonly.TestLog, N uint64, ep string) ([][]byte, error) {
144+
t.Helper()
145+
144146
es := make([][]byte, 0, N)
145147
fs := make([]tessera.IndexFuture, 0, N)
146148
for i := range N {
147149
e := fmt.Appendf(nil, "%s-%d", ep, i)
148150
es = append(es, e)
149-
fs = append(fs, tl.Appender.Add(ctx, tessera.NewEntry(e)))
151+
fs = append(fs, tl.Appender.Add(t.Context(), tessera.NewEntry(e)))
150152
}
153+
t.Logf("Added %d entries", N)
151154

152-
a := tessera.NewPublicationAwaiter(ctx, tl.LogReader.ReadCheckpoint, time.Second)
155+
a := tessera.NewPublicationAwaiter(t.Context(), tl.LogReader.ReadCheckpoint, time.Second)
153156
for _, f := range fs {
154-
if _, _, err := a.Await(ctx, f); err != nil {
157+
if _, _, err := a.Await(t.Context(), f); err != nil {
155158
return nil, err
156159
}
157160
}

0 commit comments

Comments
 (0)