Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 159 additions & 37 deletions mirror_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,21 @@ import (
"crypto/hmac"
"crypto/rand"
"crypto/sha256"
"encoding/base64"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"io"
"iter"
"log/slog"
"strings"
"unicode"
"unicode/utf8"

"github.com/transparency-dev/formats/log"
"github.com/transparency-dev/tessera/api"
"github.com/transparency-dev/tessera/api/layout"
"github.com/transparency-dev/tessera/internal/parse"
"golang.org/x/mod/sumdb/note"
)

Expand Down Expand Up @@ -105,16 +110,23 @@ type MirrorWriter interface {
IntegrateBundles(ctx context.Context, from uint64, bundles iter.Seq[api.EntryBundle]) (uint64, []byte, error)
// IntegratedSize returns the size of the local integrated tree.
IntegratedSize(ctx context.Context) (uint64, error)

// UpdateCheckpoint MUST atomically update the local published checkpoint for the log mirror.
// The provided function f should be called with the contents of the currently published checkpoint,
// this will be of zero length if there is no currently published checkpoint, and should
// return the new serialised checkpoint or an error. If the function returns an error, the currently
// published checkpoint MUST NOT be altered.
UpdateCheckpoint(ctx context.Context, f func(oldCP []byte) (newCP []byte, err error)) error
}

// MirrorTarget is a high-level wrapper that manages the process of mirroring
// a source log into a Tessera instance.
// MirrorTarget manages the process of mirroring a source log into a Tessera instance.
type MirrorTarget struct {
writer MirrorWriter
reader LogReader
cpSource func(context.Context) ([]byte, error)
signer note.Signer
ticketKey []byte
writer MirrorWriter
reader LogReader
cpSource func(context.Context) ([]byte, error)
signer note.Signer
logVerifier note.Verifier
ticketKey []byte
}

// NewMirrorTarget instantiates a new MirrorTarget for the given driver and options.
Expand All @@ -141,11 +153,12 @@ func NewMirrorTarget(ctx context.Context, d Driver, opts *MirrorOptions) (*Mirro
return nil, fmt.Errorf("failed to derive ticket key: %v", err)
}
return &MirrorTarget{
writer: mw,
reader: r,
cpSource: opts.cpSource,
signer: opts.signer,
ticketKey: tK,
writer: mw,
reader: r,
cpSource: opts.cpSource,
signer: opts.signer,
logVerifier: opts.logVerifier,
ticketKey: tK,
}, nil
}

Expand Down Expand Up @@ -184,6 +197,8 @@ type MirrorPackage struct {
// ticket for future invocations, and, optionally, a cosignature over a pending checkpoint
// whose size matches uploadEnd if one exists.
func (mt *MirrorTarget) AddEntries(ctx context.Context, uploadStart, uploadEnd uint64, ticketBytes []byte, next func() (*MirrorPackage, error)) (nextEntry uint64, pendingSize uint64, newTicket []byte, cosigs []byte, err error) {
// TODO(al) handle non-aligned uploadStart.

curIntegratedSize, err := mt.reader.IntegratedSize(ctx)
if err != nil {
return 0, 0, nil, nil, fmt.Errorf("failed to read integrated size: %w", err)
Expand All @@ -195,36 +210,36 @@ func (mt *MirrorTarget) AddEntries(ctx context.Context, uploadStart, uploadEnd u
// If the client didn't provide a [valid] ticket, then we don't have a pending
// checkpoint to validate against, so we return a new ticket with the
// current checkpoint.
pendingCP, err := mt.cpSource(ctx)
pendingCPRaw, err := mt.cpSource(ctx)
if err != nil {
return 0, 0, nil, nil, fmt.Errorf("failed to get pending checkpoint: %v", err)
}
if len(pendingCP) == 0 {
if len(pendingCPRaw) == 0 {
return 0, 0, nil, nil, ErrNoPendingCheckpoint
}
ticketBytes, err = mt.sealTicket(pendingCP)
ticketBytes, err = mt.sealTicket(pendingCPRaw)
if err != nil {
return 0, 0, nil, nil, fmt.Errorf("failed to create ticket: %v", err)
}

_, pendingSize, _, err := parse.CheckpointUnsafe(pendingCP)
pendingCP, _, _, err := log.ParseCheckpoint(pendingCPRaw, mt.logVerifier.Name(), mt.logVerifier)
if err != nil {
slog.ErrorContext(ctx, "Invalid pending checkpoint from source", slog.String("pending_checkpoint", string(pendingCP)), slog.String("error", err.Error()))
slog.ErrorContext(ctx, "Invalid pending checkpoint from source", slog.String("pending_checkpoint", string(pendingCPRaw)), slog.String("error", err.Error()))
return 0, 0, nil, nil, fmt.Errorf("failed to parse pending checkpoint while creating ticket: %v", err)
}

slog.DebugContext(ctx, "Returning new ticket", slog.Uint64("curIntegratedSize", curIntegratedSize), slog.Uint64("pendingSize", pendingSize))
return curIntegratedSize, pendingSize, ticketBytes, nil, ErrConflict
slog.DebugContext(ctx, "Returning new ticket", slog.Uint64("curIntegratedSize", curIntegratedSize), slog.Uint64("pendingSize", pendingCP.Size))

return curIntegratedSize, pendingCP.Size, ticketBytes, nil, ErrConflict
}

var pendingRoot []byte
_, pendingSize, pendingRoot, err = parse.CheckpointUnsafe(ticketCP)
pendingCP, _, pendingNote, err := log.ParseCheckpoint(ticketCP, mt.logVerifier.Name(), mt.logVerifier)
if err != nil {
slog.ErrorContext(ctx, "Invalid pending checkpoint in ticket", slog.String("pending_checkpoint", string(ticketCP)), slog.String("error", err.Error()))
return 0, 0, nil, nil, fmt.Errorf("failed to parse pending checkpoint from ticket: %v", err)
}

slog.DebugContext(ctx, "Valid ticket, proceeding", slog.Uint64("curIntegratedSize", curIntegratedSize), slog.Uint64("pendingSize", pendingSize))
slog.DebugContext(ctx, "Valid ticket, proceeding", slog.Uint64("curIntegratedSize", curIntegratedSize), slog.Uint64("pendingSize", pendingCP.Size))
// Handle 409 Conflicts:
// - Zero-request check: If upload_start == 0 and upload_end == 0, the client is
// requesting initial mirror information.
Expand All @@ -235,11 +250,11 @@ func (mt *MirrorTarget) AddEntries(ctx context.Context, uploadStart, uploadEnd u
// * MUST NOT be greater than the mirror's next expected entry index.
// * MUST NOT be too far below the mirror's next entry index.
if (uploadStart == 0 && uploadEnd == 0) ||
(uploadEnd != pendingSize || uploadEnd < curIntegratedSize) ||
(uploadEnd != pendingCP.Size || uploadEnd < curIntegratedSize) ||
(uploadStart > curIntegratedSize) {
// TODO(al): add flexibility about re-writing some entries
slog.ErrorContext(ctx, "Returning conflict", slog.Uint64("curIntegratedSize", curIntegratedSize), slog.Uint64("pendingSize", pendingSize), slog.Uint64("uploadStart", uploadStart), slog.Uint64("uploadEnd", uploadEnd))
return curIntegratedSize, pendingSize, ticketBytes, nil, ErrConflict
return curIntegratedSize, pendingCP.Size, ticketBytes, nil, ErrConflict
}

bi := func(yield func(api.EntryBundle) bool) {
Expand All @@ -262,31 +277,79 @@ func (mt *MirrorTarget) AddEntries(ctx context.Context, uploadStart, uploadEnd u
}
}

// TODO(al): Check uploadStart is aligned to EntryBundleWidth.
bundleIdx := uploadStart / layout.EntryBundleWidth

nextEntry, newRoot, err := mt.writer.IntegrateBundles(ctx, bundleIdx, bi)
switch {
case err != nil:
return 0, 0, nil, nil, err
case nextEntry == pendingSize:
if !bytes.Equal(pendingRoot, newRoot) {
case nextEntry == pendingCP.Size:
if !bytes.Equal(pendingCP.Hash, newRoot) {
slog.ErrorContext(ctx, "CORRUPTION DETECTED - pending root != calculated root", slog.String("calculated_root", hex.EncodeToString(newRoot)), slog.String("pending_checkpoint", string(ticketCP)))
return 0, 0, nil, nil, errors.New("internal error")
}

// This is a complete upload.
// TODO(al):
// - cosign the pending checkpoint,
// - publish it IFF we not overwriting a larger checkpoint
// - If published, then return the cosig(s) to the caller.
return nextEntry, pendingSize, nil, []byte("— test cosig\n"), nil
case nextEntry > pendingSize:
// Ticket is stale, update the ticket
return nextEntry, pendingSize, ticketBytes, nil, nil
sigs, pubSize, err := mt.publishCheckpoint(ctx, pendingCP, pendingNote)
if err != nil {
return nextEntry, pubSize, nil, nil, fmt.Errorf("publishCheckpoint %w", err) // %w as we may need to signal ErrConflict.
}

slog.WarnContext(ctx, "Completed upload", slog.Uint64("nextEntry", nextEntry), slog.Uint64("pendingSize", pendingCP.Size), slog.String("sigs", string(sigs)))
return nextEntry, pendingCP.Size, ticketBytes, sigs, nil
case nextEntry > pendingCP.Size:
// TODO(al): ticket is stale, probably need to update the ticket?
slog.WarnContext(ctx, "nextEntry > pendingSize", slog.Uint64("nextEntry", nextEntry), slog.Uint64("pendingSize", pendingCP.Size))
return nextEntry, pendingCP.Size, ticketBytes, nil, nil
default:
slog.WarnContext(ctx, "Incomplete upload", slog.Uint64("nextEntry", nextEntry), slog.Uint64("pendingSize", pendingCP.Size))
// Incomplete upload, return an updated ticket with the current checkpoint.
return nextEntry, pendingSize, ticketBytes, nil, nil
return nextEntry, pendingCP.Size, ticketBytes, nil, nil
}
}

// publishCheckpoint attempts to sign and atomically publish the provided checkpoint.
func (mt *MirrorTarget) publishCheckpoint(ctx context.Context, newCP *log.Checkpoint, cpNote *note.Note) ([]byte, uint64, error) {
var retSigs []byte
retSize := newCP.Size

if err := mt.writer.UpdateCheckpoint(ctx, func(oldCP []byte) ([]byte, error) {
// SPEC: Finally, the mirror performs the following steps atomically. Note the mirror
// checkpoint may have changed since the start of this process.
// - Check if upload_end is still greater than or equal to the mirror checkpoint's tree size.
// - If so, update the mirror checkpoint to the pending checkpoint of size upload_end.
// If upload_end was too small, the mirror MUST respond with a "409 Conflict" HTTP status
// code, [with approriate response body].

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// code, [with approriate response body].
// code, [with appropriate response body].

// Otherwise, if the mirror checkpoint was updated, the mirror MUST respond with a "200 Success"
// HTTP status code. The response body MUST be formatted as in a witness's successful add-checkpoint
// response: a sequence of one or more note signature lines.
if len(oldCP) > 0 {
publishedCP, _, _, err := log.ParseCheckpoint(oldCP, mt.logVerifier.Name(), mt.logVerifier)
if err != nil {
return nil, fmt.Errorf("failed to parse published checkpoint: %v", err)
}
if publishedCP.Origin != newCP.Origin {
return nil, fmt.Errorf("published CP origin %q != new CP origin %q", publishedCP.Origin, newCP.Origin)
}
if publishedCP.Size > newCP.Size {
// Return the larger size so that the caller re-syncs.
retSize = publishedCP.Size
return nil, ErrConflict
}
}
signedNewCPRaw, sigs, err := signNote(cpNote, mt.signer)
if err != nil {
return nil, fmt.Errorf("failed to cosign pending checkpoint: %v", err)
}

retSigs = sigs

return signedNewCPRaw, nil
}); err != nil {
return nil, retSize, fmt.Errorf("failed to update checkpoint: %v", err)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use %w for ErrConflict.

Suggested change
return nil, retSize, fmt.Errorf("failed to update checkpoint: %v", err)
return nil, retSize, fmt.Errorf("failed to update checkpoint: %w", err)

}

return retSigs, retSize, nil
}

// IntegratedSize returns the size of the current integrated log.
Expand Down Expand Up @@ -315,3 +378,62 @@ func (mt *MirrorTarget) openTicket(ticketBytes []byte) ([]byte, error) {
}
return pendingCP, nil
}

func signNote(n *note.Note, signers ...note.Signer) ([]byte, []byte, error) {
// Code below is a lightly tweaked snippet from sumdb/note/note.go
// https://cs.opensource.google/go/x/mod/+/refs/tags/v0.24.0:sumdb/note/note.go;l=625-649

// Prepare signatures.
//
// We need to return both a full serialised signed note, as well as the just the
// signature lines we're adding - this is because we want to _store_ the full note, but
// the tlog-witness API requires that we only return the signature lines.
//
// Rather than using note.Sign, then running note.Open in order to get access to our
// signatures, we'll instead use our note.Signer(s) directly to sign the note message
// and then use the returned signature bytes to create both the serialised signed note
// as well as the serialised signature lines.

var sigs = bytes.Buffer{}
for _, s := range signers {
name := s.Name()
hash := s.KeyHash()
if !isValidSignerName(name) {
return nil, nil, errors.New("invalid signer")
}

sig, err := s.Sign([]byte(n.Text))
if err != nil {
return nil, nil, err
}

// Create serialised signature line and append it to our sigs buffer:
var hbuf [4]byte
binary.BigEndian.PutUint32(hbuf[:], hash)
b64 := base64.StdEncoding.EncodeToString(append(hbuf[:], sig...))
sigs.WriteString("— ")
sigs.WriteString(name)
sigs.WriteString(" ")
sigs.WriteString(b64)
sigs.WriteString("\n")

// Also create a new note.Signature and pop it into the note's Sigs list (this will cause
// the signature to be present in the output when we call note.Sign below.
n.Sigs = append(n.Sigs, note.Signature{Name: name, Hash: hash, Base64: b64})
}
// Serialise the full signed note by calling Sign.
// Note that we're not passing any signers here because we've already added signatures in the loop above, so
// this call becomes just a serialisation function.
signed, err := note.Sign(n)
if err != nil {
return nil, nil, err
}

return signed, sigs.Bytes(), nil
}

// isValiSignerdName reports whether name is valid.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// isValiSignerdName reports whether name is valid.
// isValidSignerName reports whether name is valid.

// It must be non-empty and not have any Unicode spaces or pluses.
func isValidSignerName(name string) bool {
return name != "" && utf8.ValidString(name) && strings.IndexFunc(name, unicode.IsSpace) < 0 && !strings.Contains(name, "+")
}
Loading
Loading