-
Notifications
You must be signed in to change notification settings - Fork 48
Sign and publish checkpoints #1023
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
AlCutter
merged 1 commit into
transparency-dev:main
from
AlCutter:sign_and_publish_checkpoints
Jun 26, 2026
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -21,16 +21,21 @@ import ( | |||||
| "crypto/hmac" | ||||||
| "crypto/rand" | ||||||
| "crypto/sha256" | ||||||
| "encoding/base64" | ||||||
| "encoding/binary" | ||||||
| "encoding/hex" | ||||||
| "errors" | ||||||
| "fmt" | ||||||
| "io" | ||||||
| "iter" | ||||||
| "log/slog" | ||||||
| "strings" | ||||||
| "unicode" | ||||||
| "unicode/utf8" | ||||||
|
|
||||||
| "github.com/transparency-dev/formats/log" | ||||||
| "github.com/transparency-dev/tessera/api" | ||||||
| "github.com/transparency-dev/tessera/api/layout" | ||||||
| "github.com/transparency-dev/tessera/internal/parse" | ||||||
| "golang.org/x/mod/sumdb/note" | ||||||
| ) | ||||||
|
|
||||||
|
|
@@ -105,16 +110,23 @@ type MirrorWriter interface { | |||||
| IntegrateBundles(ctx context.Context, from uint64, bundles iter.Seq[api.EntryBundle]) (uint64, []byte, error) | ||||||
| // IntegratedSize returns the size of the local integrated tree. | ||||||
| IntegratedSize(ctx context.Context) (uint64, error) | ||||||
|
|
||||||
| // UpdateCheckpoint MUST atomically update the local published checkpoint for the log mirror. | ||||||
| // The provided function f should be called with the contents of the currently published checkpoint, | ||||||
| // this will be of zero length if there is no currently published checkpoint, and should | ||||||
| // return the new serialised checkpoint or an error. If the function returns an error, the currently | ||||||
| // published checkpoint MUST NOT be altered. | ||||||
| UpdateCheckpoint(ctx context.Context, f func(oldCP []byte) (newCP []byte, err error)) error | ||||||
| } | ||||||
|
|
||||||
| // MirrorTarget is a high-level wrapper that manages the process of mirroring | ||||||
| // a source log into a Tessera instance. | ||||||
| // MirrorTarget manages the process of mirroring a source log into a Tessera instance. | ||||||
| type MirrorTarget struct { | ||||||
| writer MirrorWriter | ||||||
| reader LogReader | ||||||
| cpSource func(context.Context) ([]byte, error) | ||||||
| signer note.Signer | ||||||
| ticketKey []byte | ||||||
| writer MirrorWriter | ||||||
| reader LogReader | ||||||
| cpSource func(context.Context) ([]byte, error) | ||||||
| signer note.Signer | ||||||
| logVerifier note.Verifier | ||||||
| ticketKey []byte | ||||||
| } | ||||||
|
|
||||||
| // NewMirrorTarget instantiates a new MirrorTarget for the given driver and options. | ||||||
|
|
@@ -141,11 +153,12 @@ func NewMirrorTarget(ctx context.Context, d Driver, opts *MirrorOptions) (*Mirro | |||||
| return nil, fmt.Errorf("failed to derive ticket key: %v", err) | ||||||
| } | ||||||
| return &MirrorTarget{ | ||||||
| writer: mw, | ||||||
| reader: r, | ||||||
| cpSource: opts.cpSource, | ||||||
| signer: opts.signer, | ||||||
| ticketKey: tK, | ||||||
| writer: mw, | ||||||
| reader: r, | ||||||
| cpSource: opts.cpSource, | ||||||
| signer: opts.signer, | ||||||
| logVerifier: opts.logVerifier, | ||||||
| ticketKey: tK, | ||||||
| }, nil | ||||||
| } | ||||||
|
|
||||||
|
|
@@ -184,6 +197,8 @@ type MirrorPackage struct { | |||||
| // ticket for future invocations, and, optionally, a cosignature over a pending checkpoint | ||||||
| // whose size matches uploadEnd if one exists. | ||||||
| func (mt *MirrorTarget) AddEntries(ctx context.Context, uploadStart, uploadEnd uint64, ticketBytes []byte, next func() (*MirrorPackage, error)) (nextEntry uint64, pendingSize uint64, newTicket []byte, cosigs []byte, err error) { | ||||||
| // TODO(al) handle non-aligned uploadStart. | ||||||
|
|
||||||
| curIntegratedSize, err := mt.reader.IntegratedSize(ctx) | ||||||
| if err != nil { | ||||||
| return 0, 0, nil, nil, fmt.Errorf("failed to read integrated size: %w", err) | ||||||
|
|
@@ -195,36 +210,36 @@ func (mt *MirrorTarget) AddEntries(ctx context.Context, uploadStart, uploadEnd u | |||||
| // If the client didn't provide a [valid] ticket, then we don't have a pending | ||||||
| // checkpoint to validate against, so we return a new ticket with the | ||||||
| // current checkpoint. | ||||||
| pendingCP, err := mt.cpSource(ctx) | ||||||
| pendingCPRaw, err := mt.cpSource(ctx) | ||||||
| if err != nil { | ||||||
| return 0, 0, nil, nil, fmt.Errorf("failed to get pending checkpoint: %v", err) | ||||||
| } | ||||||
| if len(pendingCP) == 0 { | ||||||
| if len(pendingCPRaw) == 0 { | ||||||
| return 0, 0, nil, nil, ErrNoPendingCheckpoint | ||||||
| } | ||||||
| ticketBytes, err = mt.sealTicket(pendingCP) | ||||||
| ticketBytes, err = mt.sealTicket(pendingCPRaw) | ||||||
| if err != nil { | ||||||
| return 0, 0, nil, nil, fmt.Errorf("failed to create ticket: %v", err) | ||||||
| } | ||||||
|
|
||||||
| _, pendingSize, _, err := parse.CheckpointUnsafe(pendingCP) | ||||||
| pendingCP, _, _, err := log.ParseCheckpoint(pendingCPRaw, mt.logVerifier.Name(), mt.logVerifier) | ||||||
| if err != nil { | ||||||
| slog.ErrorContext(ctx, "Invalid pending checkpoint from source", slog.String("pending_checkpoint", string(pendingCP)), slog.String("error", err.Error())) | ||||||
| slog.ErrorContext(ctx, "Invalid pending checkpoint from source", slog.String("pending_checkpoint", string(pendingCPRaw)), slog.String("error", err.Error())) | ||||||
| return 0, 0, nil, nil, fmt.Errorf("failed to parse pending checkpoint while creating ticket: %v", err) | ||||||
| } | ||||||
|
|
||||||
| slog.DebugContext(ctx, "Returning new ticket", slog.Uint64("curIntegratedSize", curIntegratedSize), slog.Uint64("pendingSize", pendingSize)) | ||||||
| return curIntegratedSize, pendingSize, ticketBytes, nil, ErrConflict | ||||||
| slog.DebugContext(ctx, "Returning new ticket", slog.Uint64("curIntegratedSize", curIntegratedSize), slog.Uint64("pendingSize", pendingCP.Size)) | ||||||
|
|
||||||
| return curIntegratedSize, pendingCP.Size, ticketBytes, nil, ErrConflict | ||||||
| } | ||||||
|
|
||||||
| var pendingRoot []byte | ||||||
| _, pendingSize, pendingRoot, err = parse.CheckpointUnsafe(ticketCP) | ||||||
| pendingCP, _, pendingNote, err := log.ParseCheckpoint(ticketCP, mt.logVerifier.Name(), mt.logVerifier) | ||||||
| if err != nil { | ||||||
| slog.ErrorContext(ctx, "Invalid pending checkpoint in ticket", slog.String("pending_checkpoint", string(ticketCP)), slog.String("error", err.Error())) | ||||||
| return 0, 0, nil, nil, fmt.Errorf("failed to parse pending checkpoint from ticket: %v", err) | ||||||
| } | ||||||
|
|
||||||
| slog.DebugContext(ctx, "Valid ticket, proceeding", slog.Uint64("curIntegratedSize", curIntegratedSize), slog.Uint64("pendingSize", pendingSize)) | ||||||
| slog.DebugContext(ctx, "Valid ticket, proceeding", slog.Uint64("curIntegratedSize", curIntegratedSize), slog.Uint64("pendingSize", pendingCP.Size)) | ||||||
| // Handle 409 Conflicts: | ||||||
| // - Zero-request check: If upload_start == 0 and upload_end == 0, the client is | ||||||
| // requesting initial mirror information. | ||||||
|
|
@@ -235,11 +250,11 @@ func (mt *MirrorTarget) AddEntries(ctx context.Context, uploadStart, uploadEnd u | |||||
| // * MUST NOT be greater than the mirror's next expected entry index. | ||||||
| // * MUST NOT be too far below the mirror's next entry index. | ||||||
| if (uploadStart == 0 && uploadEnd == 0) || | ||||||
| (uploadEnd != pendingSize || uploadEnd < curIntegratedSize) || | ||||||
| (uploadEnd != pendingCP.Size || uploadEnd < curIntegratedSize) || | ||||||
| (uploadStart > curIntegratedSize) { | ||||||
| // TODO(al): add flexibility about re-writing some entries | ||||||
| slog.ErrorContext(ctx, "Returning conflict", slog.Uint64("curIntegratedSize", curIntegratedSize), slog.Uint64("pendingSize", pendingSize), slog.Uint64("uploadStart", uploadStart), slog.Uint64("uploadEnd", uploadEnd)) | ||||||
| return curIntegratedSize, pendingSize, ticketBytes, nil, ErrConflict | ||||||
| return curIntegratedSize, pendingCP.Size, ticketBytes, nil, ErrConflict | ||||||
| } | ||||||
|
|
||||||
| bi := func(yield func(api.EntryBundle) bool) { | ||||||
|
|
@@ -262,31 +277,79 @@ func (mt *MirrorTarget) AddEntries(ctx context.Context, uploadStart, uploadEnd u | |||||
| } | ||||||
| } | ||||||
|
|
||||||
| // TODO(al): Check uploadStart is aligned to EntryBundleWidth. | ||||||
| bundleIdx := uploadStart / layout.EntryBundleWidth | ||||||
|
|
||||||
| nextEntry, newRoot, err := mt.writer.IntegrateBundles(ctx, bundleIdx, bi) | ||||||
| switch { | ||||||
| case err != nil: | ||||||
| return 0, 0, nil, nil, err | ||||||
| case nextEntry == pendingSize: | ||||||
| if !bytes.Equal(pendingRoot, newRoot) { | ||||||
| case nextEntry == pendingCP.Size: | ||||||
| if !bytes.Equal(pendingCP.Hash, newRoot) { | ||||||
| slog.ErrorContext(ctx, "CORRUPTION DETECTED - pending root != calculated root", slog.String("calculated_root", hex.EncodeToString(newRoot)), slog.String("pending_checkpoint", string(ticketCP))) | ||||||
| return 0, 0, nil, nil, errors.New("internal error") | ||||||
| } | ||||||
|
|
||||||
| // This is a complete upload. | ||||||
| // TODO(al): | ||||||
| // - cosign the pending checkpoint, | ||||||
| // - publish it IFF we not overwriting a larger checkpoint | ||||||
| // - If published, then return the cosig(s) to the caller. | ||||||
| return nextEntry, pendingSize, nil, []byte("— test cosig\n"), nil | ||||||
| case nextEntry > pendingSize: | ||||||
| // Ticket is stale, update the ticket | ||||||
| return nextEntry, pendingSize, ticketBytes, nil, nil | ||||||
| sigs, pubSize, err := mt.publishCheckpoint(ctx, pendingCP, pendingNote) | ||||||
| if err != nil { | ||||||
| return nextEntry, pubSize, nil, nil, fmt.Errorf("publishCheckpoint %w", err) // %w as we may need to signal ErrConflict. | ||||||
| } | ||||||
|
|
||||||
| slog.WarnContext(ctx, "Completed upload", slog.Uint64("nextEntry", nextEntry), slog.Uint64("pendingSize", pendingCP.Size), slog.String("sigs", string(sigs))) | ||||||
| return nextEntry, pendingCP.Size, ticketBytes, sigs, nil | ||||||
| case nextEntry > pendingCP.Size: | ||||||
| // TODO(al): ticket is stale, probably need to update the ticket? | ||||||
| slog.WarnContext(ctx, "nextEntry > pendingSize", slog.Uint64("nextEntry", nextEntry), slog.Uint64("pendingSize", pendingCP.Size)) | ||||||
| return nextEntry, pendingCP.Size, ticketBytes, nil, nil | ||||||
| default: | ||||||
| slog.WarnContext(ctx, "Incomplete upload", slog.Uint64("nextEntry", nextEntry), slog.Uint64("pendingSize", pendingCP.Size)) | ||||||
| // Incomplete upload, return an updated ticket with the current checkpoint. | ||||||
| return nextEntry, pendingSize, ticketBytes, nil, nil | ||||||
| return nextEntry, pendingCP.Size, ticketBytes, nil, nil | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| // publishCheckpoint attempts to sign and atomically publish the provided checkpoint. | ||||||
| func (mt *MirrorTarget) publishCheckpoint(ctx context.Context, newCP *log.Checkpoint, cpNote *note.Note) ([]byte, uint64, error) { | ||||||
| var retSigs []byte | ||||||
| retSize := newCP.Size | ||||||
|
|
||||||
| if err := mt.writer.UpdateCheckpoint(ctx, func(oldCP []byte) ([]byte, error) { | ||||||
| // SPEC: Finally, the mirror performs the following steps atomically. Note the mirror | ||||||
| // checkpoint may have changed since the start of this process. | ||||||
| // - Check if upload_end is still greater than or equal to the mirror checkpoint's tree size. | ||||||
| // - If so, update the mirror checkpoint to the pending checkpoint of size upload_end. | ||||||
| // If upload_end was too small, the mirror MUST respond with a "409 Conflict" HTTP status | ||||||
| // code, [with approriate response body]. | ||||||
| // Otherwise, if the mirror checkpoint was updated, the mirror MUST respond with a "200 Success" | ||||||
| // HTTP status code. The response body MUST be formatted as in a witness's successful add-checkpoint | ||||||
| // response: a sequence of one or more note signature lines. | ||||||
| if len(oldCP) > 0 { | ||||||
| publishedCP, _, _, err := log.ParseCheckpoint(oldCP, mt.logVerifier.Name(), mt.logVerifier) | ||||||
| if err != nil { | ||||||
| return nil, fmt.Errorf("failed to parse published checkpoint: %v", err) | ||||||
| } | ||||||
| if publishedCP.Origin != newCP.Origin { | ||||||
| return nil, fmt.Errorf("published CP origin %q != new CP origin %q", publishedCP.Origin, newCP.Origin) | ||||||
| } | ||||||
| if publishedCP.Size > newCP.Size { | ||||||
| // Return the larger size so that the caller re-syncs. | ||||||
| retSize = publishedCP.Size | ||||||
| return nil, ErrConflict | ||||||
| } | ||||||
| } | ||||||
| signedNewCPRaw, sigs, err := signNote(cpNote, mt.signer) | ||||||
| if err != nil { | ||||||
| return nil, fmt.Errorf("failed to cosign pending checkpoint: %v", err) | ||||||
| } | ||||||
|
|
||||||
| retSigs = sigs | ||||||
|
|
||||||
| return signedNewCPRaw, nil | ||||||
| }); err != nil { | ||||||
| return nil, retSize, fmt.Errorf("failed to update checkpoint: %v", err) | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use
Suggested change
|
||||||
| } | ||||||
|
|
||||||
| return retSigs, retSize, nil | ||||||
| } | ||||||
|
|
||||||
| // IntegratedSize returns the size of the current integrated log. | ||||||
|
|
@@ -315,3 +378,62 @@ func (mt *MirrorTarget) openTicket(ticketBytes []byte) ([]byte, error) { | |||||
| } | ||||||
| return pendingCP, nil | ||||||
| } | ||||||
|
|
||||||
| func signNote(n *note.Note, signers ...note.Signer) ([]byte, []byte, error) { | ||||||
| // Code below is a lightly tweaked snippet from sumdb/note/note.go | ||||||
| // https://cs.opensource.google/go/x/mod/+/refs/tags/v0.24.0:sumdb/note/note.go;l=625-649 | ||||||
|
|
||||||
| // Prepare signatures. | ||||||
| // | ||||||
| // We need to return both a full serialised signed note, as well as the just the | ||||||
| // signature lines we're adding - this is because we want to _store_ the full note, but | ||||||
| // the tlog-witness API requires that we only return the signature lines. | ||||||
| // | ||||||
| // Rather than using note.Sign, then running note.Open in order to get access to our | ||||||
| // signatures, we'll instead use our note.Signer(s) directly to sign the note message | ||||||
| // and then use the returned signature bytes to create both the serialised signed note | ||||||
| // as well as the serialised signature lines. | ||||||
|
|
||||||
| var sigs = bytes.Buffer{} | ||||||
| for _, s := range signers { | ||||||
| name := s.Name() | ||||||
| hash := s.KeyHash() | ||||||
| if !isValidSignerName(name) { | ||||||
| return nil, nil, errors.New("invalid signer") | ||||||
| } | ||||||
|
|
||||||
| sig, err := s.Sign([]byte(n.Text)) | ||||||
| if err != nil { | ||||||
| return nil, nil, err | ||||||
| } | ||||||
|
|
||||||
| // Create serialised signature line and append it to our sigs buffer: | ||||||
| var hbuf [4]byte | ||||||
| binary.BigEndian.PutUint32(hbuf[:], hash) | ||||||
| b64 := base64.StdEncoding.EncodeToString(append(hbuf[:], sig...)) | ||||||
| sigs.WriteString("— ") | ||||||
| sigs.WriteString(name) | ||||||
| sigs.WriteString(" ") | ||||||
| sigs.WriteString(b64) | ||||||
| sigs.WriteString("\n") | ||||||
|
|
||||||
| // Also create a new note.Signature and pop it into the note's Sigs list (this will cause | ||||||
| // the signature to be present in the output when we call note.Sign below. | ||||||
| n.Sigs = append(n.Sigs, note.Signature{Name: name, Hash: hash, Base64: b64}) | ||||||
| } | ||||||
| // Serialise the full signed note by calling Sign. | ||||||
| // Note that we're not passing any signers here because we've already added signatures in the loop above, so | ||||||
| // this call becomes just a serialisation function. | ||||||
| signed, err := note.Sign(n) | ||||||
| if err != nil { | ||||||
| return nil, nil, err | ||||||
| } | ||||||
|
|
||||||
| return signed, sigs.Bytes(), nil | ||||||
| } | ||||||
|
|
||||||
| // isValiSignerdName reports whether name is valid. | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| // It must be non-empty and not have any Unicode spaces or pluses. | ||||||
| func isValidSignerName(name string) bool { | ||||||
| return name != "" && utf8.ValidString(name) && strings.IndexFunc(name, unicode.IsSpace) < 0 && !strings.Contains(name, "+") | ||||||
| } | ||||||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.