Skip to content

Commit 9dc3294

Browse files
authored
Implement MirrorTarget AddEntries (#1015)
Flesh-out more of the MirrorTarget lifecycle.
1 parent 85036cb commit 9dc3294

2 files changed

Lines changed: 333 additions & 3 deletions

File tree

mirror_lifecycle.go

Lines changed: 141 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,19 @@
1515
package tessera
1616

1717
import (
18+
"bytes"
1819
"context"
20+
"encoding/gob"
21+
"encoding/hex"
1922
"errors"
2023
"fmt"
24+
"io"
2125
"iter"
26+
"log/slog"
2227

2328
"github.com/transparency-dev/tessera/api"
2429
"github.com/transparency-dev/tessera/api/layout"
30+
"github.com/transparency-dev/tessera/internal/parse"
2531
"golang.org/x/mod/sumdb/note"
2632
)
2733

@@ -133,12 +139,144 @@ type MirrorPackage struct {
133139
// AddEntries processes a stream of entry packages, verifies subtree consistency proofs,
134140
// and durably commits entries to the log.
135141
//
136-
// Returns the next required entry index, a recent pending checkpoint size, an opaque ticket for future invocations, and, optionally, a cosignature over a pending checkpoint whose size matches uploadEnd if one exists.
137-
func (mt *MirrorTarget) AddEntries(ctx context.Context, uploadStart, uploadEnd uint64, ticket []byte, next func() (*MirrorPackage, error)) (uint64, uint64, []byte, []byte, error) {
138-
return 0, 0, nil, nil, errors.New("unimplemented")
142+
// Returns the next required entry index, a recent pending checkpoint size, an opaque
143+
// ticket for future invocations, and, optionally, a cosignature over a pending checkpoint
144+
// whose size matches uploadEnd if one exists.
145+
func (mt *MirrorTarget) AddEntries(ctx context.Context, uploadStart, uploadEnd uint64, ticketBytes []byte, next func() (*MirrorPackage, error)) (nextEntry uint64, pendingSize uint64, newTicket []byte, cosigs []byte, err error) {
146+
curIntegratedSize, err := mt.reader.IntegratedSize(ctx)
147+
if err != nil {
148+
return 0, 0, nil, nil, fmt.Errorf("failed to read integrated size: %w", err)
149+
}
150+
var t *ticket
151+
if t, err = mt.openTicket(ctx, ticketBytes); err != nil {
152+
// Invalid or empty ticket, return a new one.
153+
pendingCP, err := mt.cpSource(ctx)
154+
if err != nil {
155+
return 0, 0, nil, nil, fmt.Errorf("failed to get pending checkpoint: %v", err)
156+
}
157+
if len(pendingCP) == 0 {
158+
return 0, 0, nil, nil, ErrNoPendingCheckpoint
159+
}
160+
t = &ticket{
161+
PendingCP: pendingCP,
162+
}
163+
ticketBytes, err = mt.sealTicket(ctx, t)
164+
if err != nil {
165+
return 0, 0, nil, nil, fmt.Errorf("failed to create ticket: %v", err)
166+
}
167+
168+
// If the client didn't provide a [valid] ticket, then we don't have a pending
169+
// checkpoint to validate against, so we return a new ticket with the
170+
// current checkpoint.
171+
_, pendingSize, _, err := parse.CheckpointUnsafe(t.PendingCP)
172+
if err != nil {
173+
slog.ErrorContext(ctx, "Invalid pending checkpoint from source", slog.String("pending_checkpoint", string(t.PendingCP)), slog.String("error", err.Error()))
174+
return 0, 0, nil, nil, fmt.Errorf("failed to parse pending checkpoint while creating ticket: %v", err)
175+
}
176+
return curIntegratedSize, pendingSize, ticketBytes, nil, ErrConflict
177+
}
178+
179+
var pendingRoot []byte
180+
_, pendingSize, pendingRoot, err = parse.CheckpointUnsafe(t.PendingCP)
181+
if err != nil {
182+
slog.ErrorContext(ctx, "Invalid pending checkpoint in ticket", slog.String("pending_checkpoint", string(t.PendingCP)), slog.String("error", err.Error()))
183+
return 0, 0, nil, nil, fmt.Errorf("failed to parse pending checkpoint from ticket: %v", err)
184+
}
185+
186+
// Handle 409 Conflicts:
187+
// - Zero-request check: If upload_start == 0 and upload_end == 0, the client is
188+
// requesting initial mirror information.
189+
// - upload_end:
190+
// * MUST be equal to the tree size of a known pending checkpoint value.
191+
// * MUST NOT be less than the mirror checkpoint's tree size.
192+
// - upload_start:
193+
// * MUST NOT be greater than the mirror's next expected entry index.
194+
// * MUST NOT be too far below the mirror's next entry index.
195+
if (uploadStart == 0 && uploadEnd == 0) ||
196+
(uploadEnd != pendingSize || uploadEnd < curIntegratedSize) ||
197+
(uploadStart > curIntegratedSize) {
198+
// TODO(al): add flexibility about re-writing some entries
199+
return curIntegratedSize, pendingSize, ticketBytes, nil, ErrConflict
200+
}
201+
202+
203+
bi := func(yield func(api.EntryBundle) bool) {
204+
for {
205+
pkg, err := next()
206+
if err != nil {
207+
if err == io.EOF {
208+
return
209+
}
210+
// TODO(al): handle this
211+
slog.WarnContext(ctx, "NextPackage returned an error", slog.String("error", err.Error()))
212+
return
213+
}
214+
215+
// TODO(al): verify entries+proof under checkpoint (Failure -> 422 Unprocessable Entity).
216+
217+
if !yield(api.EntryBundle{Entries: pkg.Entries}) {
218+
return
219+
}
220+
}
221+
}
222+
223+
// TODO(al): Check uploadStart is aligned to EntryBundleWidth.
224+
bundleIdx := uploadStart/layout.EntryBundleWidth
225+
226+
nextEntry, newRoot, err := mt.writer.IntegrateBundles(ctx, bundleIdx, bi)
227+
switch {
228+
case err != nil:
229+
return 0, 0, nil, nil, err
230+
case nextEntry == pendingSize:
231+
if !bytes.Equal(pendingRoot, newRoot) {
232+
slog.ErrorContext(ctx, "CORRUPTION DETECTED - pending root != calculated root", slog.String("calculated_root", hex.EncodeToString(newRoot)), slog.String("pending_checkpoint", string(t.PendingCP)))
233+
return 0, 0, nil, nil, errors.New("internal error")
234+
}
235+
// This is a complete upload.
236+
// TODO(al):
237+
// - cosign the pending checkpoint,
238+
// - publish it IFF we not overwriting a larger checkpoint
239+
// - If published, then return the cosig(s) to the caller.
240+
return nextEntry, pendingSize, nil, []byte("— test cosig\n"), nil
241+
case nextEntry > pendingSize:
242+
// TODO(al): ticket is stale, probably need to update the ticket?
243+
slog.WarnContext(ctx, "nextEntry > pendingSize", slog.Uint64("nextEntry", nextEntry), slog.Uint64("pendingSize", pendingSize))
244+
return nextEntry, pendingSize, ticketBytes, nil, nil
245+
default:
246+
// Incomplete upload, return an updated ticket with the current checkpoint.
247+
return nextEntry, pendingSize, ticketBytes, nil, nil
248+
}
139249
}
140250

141251
// IntegratedSize returns the size of the current integrated log.
142252
func (mt *MirrorTarget) IntegratedSize(ctx context.Context) (uint64, error) {
143253
return mt.reader.IntegratedSize(ctx)
144254
}
255+
256+
// ticket is the underlying structure of an add-entries ticket.
257+
type ticket struct {
258+
// PendingCP holds the raw pending checkpoint bytes.
259+
PendingCP []byte
260+
}
261+
262+
func (mt *MirrorTarget) sealTicket(ctx context.Context, t *ticket) ([]byte, error) {
263+
out := bytes.Buffer{}
264+
if err := gob.NewEncoder(&out).Encode(t); err != nil {
265+
return nil, fmt.Errorf("ticket encoding failed: %v", err)
266+
}
267+
// TODO(al): harden ticket & bind to this particular log mirror.
268+
return out.Bytes(), nil
269+
}
270+
271+
func (mt *MirrorTarget) openTicket(ctx context.Context, ticketBytes []byte) (*ticket, error) {
272+
if len(ticketBytes) == 0 {
273+
return nil, errors.New("empty ticket")
274+
}
275+
// TODO(al): harden ticket & verify it's for this particular log mirror.
276+
var t ticket
277+
if err := gob.NewDecoder(bytes.NewReader(ticketBytes)).Decode(&t); err != nil {
278+
return nil, fmt.Errorf("ticket decoding failed: %v", err)
279+
}
280+
return &t, nil
281+
}
282+

mirror_lifecycle_test.go

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
// Copyright 2026 The Tessera authors. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package tessera
16+
17+
import (
18+
"context"
19+
"encoding/base64"
20+
"errors"
21+
"fmt"
22+
"io"
23+
"iter"
24+
"testing"
25+
26+
"github.com/transparency-dev/tessera/api"
27+
)
28+
29+
type fakeMirrorWriter struct {
30+
integrateFunc func(ctx context.Context, from uint64, bundles iter.Seq[api.EntryBundle]) (uint64, []byte, error)
31+
sizeFunc func(ctx context.Context) (uint64, error)
32+
}
33+
34+
func (f *fakeMirrorWriter) IntegrateBundles(ctx context.Context, from uint64, bundles iter.Seq[api.EntryBundle]) (uint64, []byte, error) {
35+
if f.integrateFunc != nil {
36+
return f.integrateFunc(ctx, from, bundles)
37+
}
38+
return from, nil, nil
39+
}
40+
41+
func (f *fakeMirrorWriter) IntegratedSize(ctx context.Context) (uint64, error) {
42+
if f.sizeFunc != nil {
43+
return f.sizeFunc(ctx)
44+
}
45+
return 0, nil
46+
}
47+
48+
type fakeLogReader struct {
49+
sizeFunc func(ctx context.Context) (uint64, error)
50+
}
51+
52+
func (f *fakeLogReader) IntegratedSize(ctx context.Context) (uint64, error) {
53+
if f.sizeFunc != nil {
54+
return f.sizeFunc(ctx)
55+
}
56+
return 0, nil
57+
}
58+
func (f *fakeLogReader) ReadCheckpoint(ctx context.Context) ([]byte, error) { return nil, nil }
59+
func (f *fakeLogReader) ReadTile(ctx context.Context, level, index uint64, p uint8) ([]byte, error) {
60+
return nil, nil
61+
}
62+
func (f *fakeLogReader) ReadEntryBundle(ctx context.Context, index uint64, p uint8) ([]byte, error) {
63+
return nil, nil
64+
}
65+
func (f *fakeLogReader) NextIndex(ctx context.Context) (uint64, error) { return 0, nil }
66+
67+
const (
68+
testPendingCPOrigin = "test-origin"
69+
testPendingCPSize = uint64(200)
70+
testPendingCPRoot = "47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZG3hSuFU="
71+
)
72+
73+
var testPendingCP = fmt.Sprintf("%s\n%d\n%s\n— test-sig\n", testPendingCPOrigin, testPendingCPSize, testPendingCPRoot)
74+
75+
func newTestMirrorTarget(size uint64) *MirrorTarget {
76+
return &MirrorTarget{
77+
writer: &fakeMirrorWriter{
78+
sizeFunc: func(ctx context.Context) (uint64, error) { return size, nil },
79+
},
80+
reader: &fakeLogReader{
81+
sizeFunc: func(ctx context.Context) (uint64, error) { return size, nil },
82+
},
83+
cpSource: func(ctx context.Context) ([]byte, error) {
84+
return []byte(testPendingCP), nil
85+
},
86+
}
87+
}
88+
89+
func TestMirrorTarget_AddEntries_NoTicket(t *testing.T) {
90+
const (
91+
testIntegratedSize = uint64(100)
92+
)
93+
ctx := context.Background()
94+
mt := newTestMirrorTarget(testIntegratedSize)
95+
96+
nextEntry, pendingSize, newTicket, _, err := mt.AddEntries(ctx, 0, 0, nil, func() (*MirrorPackage, error) {
97+
return nil, io.EOF
98+
})
99+
if !errors.Is(err, ErrConflict) {
100+
t.Errorf("got %v, want ErrConflict", err)
101+
}
102+
if got, want := nextEntry, testIntegratedSize; got != want {
103+
t.Errorf("got %d, want %d", got, want)
104+
}
105+
if got, want := pendingSize, testPendingCPSize; got != want {
106+
t.Errorf("got %d, want %d", got, want)
107+
}
108+
if len(newTicket) == 0 {
109+
t.Errorf("got empty ticket, want non-empty")
110+
}
111+
}
112+
113+
func TestMirrorTarget_AddEntries_RangeConflict(t *testing.T) {
114+
const (
115+
testUploadStart = uint64(100)
116+
testUploadEnd = uint64(250)
117+
testPendingSize = uint64(200)
118+
testIntegratedSize = uint64(150)
119+
)
120+
ctx := context.Background()
121+
mt := &MirrorTarget{
122+
writer: &fakeMirrorWriter{
123+
sizeFunc: func(ctx context.Context) (uint64, error) { return testIntegratedSize, nil },
124+
},
125+
reader: &fakeLogReader{
126+
sizeFunc: func(ctx context.Context) (uint64, error) { return testIntegratedSize, nil },
127+
},
128+
cpSource: func(ctx context.Context) ([]byte, error) { return []byte(testPendingCP), nil },
129+
}
130+
131+
validTicket, err := mt.sealTicket(ctx, &ticket{PendingCP: []byte(testPendingCP)})
132+
if err != nil {
133+
t.Fatalf("sealTicket failed: %v", err)
134+
}
135+
136+
// testUploadEnd != pendingSize -> conflict
137+
_, _, _, _, err = mt.AddEntries(ctx, testUploadStart, testUploadEnd, validTicket, func() (*MirrorPackage, error) {
138+
return nil, io.EOF
139+
})
140+
if !errors.Is(err, ErrConflict) {
141+
t.Errorf("want ErrConflict, got %v", err)
142+
}
143+
}
144+
145+
func TestMirrorTarget_AddEntries_CompleteUpload(t *testing.T) {
146+
const (
147+
testIntegratedSize = uint64(100)
148+
testUploadStart = uint64(100)
149+
testUploadEnd = uint64(200)
150+
)
151+
152+
ctx := context.Background()
153+
mt := &MirrorTarget{
154+
writer: &fakeMirrorWriter{
155+
integrateFunc: func(ctx context.Context, from uint64, bundles iter.Seq[api.EntryBundle]) (uint64, []byte, error) {
156+
// Consume iterator
157+
for range bundles {
158+
}
159+
decodedRoot, err := base64.StdEncoding.DecodeString(testPendingCPRoot)
160+
if err != nil {
161+
return 0, nil, fmt.Errorf("TEST ERROR: %v", err)
162+
}
163+
return testPendingCPSize, decodedRoot, nil
164+
},
165+
},
166+
reader: &fakeLogReader{
167+
sizeFunc: func(ctx context.Context) (uint64, error) { return testIntegratedSize, nil },
168+
},
169+
cpSource: func(ctx context.Context) ([]byte, error) { return []byte(testPendingCP), nil },
170+
}
171+
172+
validTicket, err := mt.sealTicket(ctx, &ticket{PendingCP: []byte(testPendingCP)})
173+
if err != nil {
174+
t.Fatalf("sealTicket failed: %v", err)
175+
}
176+
177+
nextEntry, pendingSize, _, cosigs, err := mt.AddEntries(ctx, testUploadStart, testUploadEnd, validTicket, func() (*MirrorPackage, error) {
178+
return nil, io.EOF
179+
})
180+
if err != nil {
181+
t.Errorf("unexpected error: %v", err)
182+
}
183+
if got, want := nextEntry, testUploadEnd; got != want {
184+
t.Errorf("got %d, want %d", got, want)
185+
}
186+
if got, want := pendingSize, testUploadEnd; got != want {
187+
t.Errorf("got %d, want %d", got, want)
188+
}
189+
if len(cosigs) == 0 {
190+
t.Errorf("got empty cosigs, want non-empty")
191+
}
192+
}

0 commit comments

Comments
 (0)