Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cmd/mtc/mirror/internal/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,14 @@ func addEntries(m *MirrorMux) http.HandlerFunc {
}()
default:
// Log unknown encodings and treat as malformed request
slog.WarnContext(r.Context(), "Unknown Content-Encoding", slog.String("encoding", ce))
slog.DebugContext(r.Context(), "Unknown Content-Encoding", slog.String("encoding", ce))
http.Error(w, "Unsupported content encoding", http.StatusBadRequest)
return
}
req, err := parseAddEntriesPreamble(reader)
if err != nil {
http.Error(w, "Failed to parse request preamble", http.StatusBadRequest)
slog.DebugContext(r.Context(), "Failed to parse request preamble", slog.Any("err", err))
http.Error(w, "Invalid request preamble", http.StatusBadRequest)
return
}

Expand All @@ -108,7 +109,7 @@ func addEntries(m *MirrorMux) http.HandlerFunc {
return

case err != nil:
slog.ErrorContext(r.Context(), "Failed to add entries", slog.String("origin", req.logOrigin), slog.Any("err", err))
slog.DebugContext(r.Context(), "Failed to add entries", slog.String("origin", req.logOrigin), slog.Any("err", err))
http.Error(w, "Failed to add entries", http.StatusInternalServerError)
return

Expand Down
10 changes: 7 additions & 3 deletions cmd/mtc/mirror/posix/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,12 @@ var (
witnessCosignerPath = flag.String("witness_cosigner_path", "", "The path to the note-formatted witness cosigner secret key.")
mirrorCosignerPath = flag.String("mirror_cosigner_path", "", "The path to the note-formatted mirror cosigner secret key.")
mirrorConfigPath = flag.String("config_path", "", "The path to the mirror configuration file.")
slogLevel = flag.Int("slog_level", 0, "The cut-off threshold for structured logging.")
)

func main() {
flag.Parse()
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stderr, nil)))
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.Level(*slogLevel)})))
ctx := context.Background()

w, wp, shutdown := witnessFromFlags(ctx)
Expand All @@ -75,7 +76,7 @@ func main() {
origin := v.Name()

// Create the mirror
t, err := newMirrorTarget(ctx, w, origin, mirrorCosigner)
t, err := newMirrorTarget(ctx, w, v, mirrorCosigner)
if err != nil {
slog.ErrorContext(ctx, "Failed to create mirror target", slog.String("origin", origin), slog.Any("error", err))
os.Exit(1)
Expand Down Expand Up @@ -107,7 +108,9 @@ func main() {
//
// The target directory for the driver is derived from the storage directory and the origin in accordance
// with the `tlog-mirror` spec, allowing the root of the storage directory to be exported directly to read-only clients.
func newMirrorTarget(ctx context.Context, w *witness.Witness, origin string, mirrorSigner note.Signer) (*tessera.MirrorTarget, error) {
func newMirrorTarget(ctx context.Context, w *witness.Witness, logVerifier note.Verifier, mirrorSigner note.Signer) (*tessera.MirrorTarget, error) {
origin := logVerifier.Name()

targetDir := filepath.Join(*storageDir, url.PathEscape(origin))
if err := os.MkdirAll(targetDir, 0o755); err != nil {
return nil, fmt.Errorf("mkdir %q: %v", targetDir, err)
Expand All @@ -120,6 +123,7 @@ func newMirrorTarget(ctx context.Context, w *witness.Witness, origin string, mir
WithCheckpointSource(func(ctx context.Context) ([]byte, error) {
return w.GetCheckpoint(ctx, origin)
}).
WithLogVerifier(logVerifier).
WithSigner(mirrorSigner)
return tessera.NewMirrorTarget(ctx, d, mOpts)
}
Expand Down
133 changes: 85 additions & 48 deletions mirror_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ package tessera
import (
"bytes"
"context"
"encoding/gob"
"crypto/hkdf"
"crypto/hmac"
"crypto/rand"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
Expand All @@ -42,15 +45,22 @@ var (

// MirrorOptions holds mirror lifecycle settings for all storage implementations.
type MirrorOptions struct {
signer note.Signer
cpSource func(context.Context) ([]byte, error)
signer note.Signer
cpSource func(context.Context) ([]byte, error)
logVerifier note.Verifier
}

// NewMirrorOptions creates a new options struct with defaults.
func NewMirrorOptions() *MirrorOptions {
return &MirrorOptions{}
}

// WithLogVerifier sets the note.Verifier used to verify log checkpoint signatures.
func (o *MirrorOptions) WithLogVerifier(v note.Verifier) *MirrorOptions {
o.logVerifier = v
return o
}

// WithSigner configures the note.Signer to use when cosigning checkpoints.
func (o *MirrorOptions) WithSigner(s note.Signer) *MirrorOptions {
o.signer = s
Expand All @@ -76,6 +86,9 @@ func (o *MirrorOptions) LeafHasher() func(bundle []byte) (leafHashes [][]byte, e
}

func (o *MirrorOptions) valid() error {
if o.logVerifier == nil {
return errors.New("invalid MirrorOptions: WithLogVerifier must be set")
}
if o.signer == nil {
return errors.New("invalid MirrorOptions: WithSigner must be set")
}
Expand All @@ -97,10 +110,11 @@ type MirrorWriter interface {
// MirrorTarget is a high-level wrapper that manages the process of mirroring
// a source log into a Tessera instance.
type MirrorTarget struct {
writer MirrorWriter
reader LogReader
cpSource func(context.Context) ([]byte, error)
signer note.Signer
writer MirrorWriter
reader LogReader
cpSource func(context.Context) ([]byte, error)
signer note.Signer
ticketKey []byte
}

// NewMirrorTarget instantiates a new MirrorTarget for the given driver and options.
Expand All @@ -122,14 +136,41 @@ func NewMirrorTarget(ctx context.Context, d Driver, opts *MirrorOptions) (*Mirro
if err != nil {
return nil, fmt.Errorf("failed to init MirrorTarget lifecycle: %v", err)
}
tK, err := ticketKey(opts.signer.Name(), opts.logVerifier.Name())
if err != nil {
return nil, fmt.Errorf("failed to derive ticket key: %v", err)
}
return &MirrorTarget{
writer: mw,
reader: r,
cpSource: opts.cpSource,
signer: opts.signer,
writer: mw,
reader: r,
cpSource: opts.cpSource,
signer: opts.signer,
ticketKey: tK,
}, nil
}

// ticketKey derives a unique HMAC key for sealing tickets based on:
// - An ephemeral seed,
// - Identity (origin) of the mirror cosigner,
// - Identity (origin) of the log being mirrored.
//
// It should be called, once, at startup to set the ticket MAC key for the mirror.
//
// TODO(al): We should allow the operator to pass in the seed, so that tickets
// will work across multiple mirror instances and/or restarts.
func ticketKey(mirrorOrigin, logOrigin string) ([]byte, error) {
seed := make([]byte, sha256.Size)
if _, err := rand.Read(seed); err != nil {
return nil, fmt.Errorf("failed to generate ephemeral seed: %v", err)
}
// This salt will keep the key unique per mirror, even if the random seed generation above
// were changed to be a "fixed" value provided by the operator.
salt := sha256.Sum256(fmt.Appendf(nil, "mirror:\n%s\n", mirrorOrigin))
// Bind this key to its usage for MACing tickets for the given log.
info := fmt.Sprintf("ticket-hmac\nlog:\n%s\n", logOrigin)
return hkdf.Key(sha256.New, seed, salt[:], info, sha256.Size)
}

// Package represents a single package of entries and its subtree consistency proof.
type MirrorPackage struct {
Entries [][]byte
Expand All @@ -147,42 +188,43 @@ func (mt *MirrorTarget) AddEntries(ctx context.Context, uploadStart, uploadEnd u
if err != nil {
return 0, 0, nil, nil, fmt.Errorf("failed to read integrated size: %w", err)
}
var t *ticket
if t, err = mt.openTicket(ctx, ticketBytes); err != nil {
// Invalid or empty ticket, return a new one.
ticketCP, err := mt.openTicket(ticketBytes)
if err != nil {
slog.DebugContext(ctx, "Invalid Ticket received, returning new one", slog.Any("error", err), slog.Uint64("uploadStart", uploadStart), slog.Uint64("uploadEnd", uploadEnd))

// If the client didn't provide a [valid] ticket, then we don't have a pending
// checkpoint to validate against, so we return a new ticket with the
// current checkpoint.
pendingCP, err := mt.cpSource(ctx)
if err != nil {
return 0, 0, nil, nil, fmt.Errorf("failed to get pending checkpoint: %v", err)
}
if len(pendingCP) == 0 {
return 0, 0, nil, nil, ErrNoPendingCheckpoint
}
t = &ticket{
PendingCP: pendingCP,
}
ticketBytes, err = mt.sealTicket(ctx, t)
ticketBytes, err = mt.sealTicket(pendingCP)
if err != nil {
return 0, 0, nil, nil, fmt.Errorf("failed to create ticket: %v", err)
}

// If the client didn't provide a [valid] ticket, then we don't have a pending
// checkpoint to validate against, so we return a new ticket with the
// current checkpoint.
_, pendingSize, _, err := parse.CheckpointUnsafe(t.PendingCP)
_, pendingSize, _, err := parse.CheckpointUnsafe(pendingCP)
if err != nil {
slog.ErrorContext(ctx, "Invalid pending checkpoint from source", slog.String("pending_checkpoint", string(t.PendingCP)), slog.String("error", err.Error()))
slog.ErrorContext(ctx, "Invalid pending checkpoint from source", slog.String("pending_checkpoint", string(pendingCP)), slog.String("error", err.Error()))
return 0, 0, nil, nil, fmt.Errorf("failed to parse pending checkpoint while creating ticket: %v", err)
}

slog.DebugContext(ctx, "Returning new ticket", slog.Uint64("curIntegratedSize", curIntegratedSize), slog.Uint64("pendingSize", pendingSize))
return curIntegratedSize, pendingSize, ticketBytes, nil, ErrConflict
}

var pendingRoot []byte
_, pendingSize, pendingRoot, err = parse.CheckpointUnsafe(t.PendingCP)
_, pendingSize, pendingRoot, err = parse.CheckpointUnsafe(ticketCP)
if err != nil {
slog.ErrorContext(ctx, "Invalid pending checkpoint in ticket", slog.String("pending_checkpoint", string(t.PendingCP)), slog.String("error", err.Error()))
slog.ErrorContext(ctx, "Invalid pending checkpoint in ticket", slog.String("pending_checkpoint", string(ticketCP)), slog.String("error", err.Error()))
return 0, 0, nil, nil, fmt.Errorf("failed to parse pending checkpoint from ticket: %v", err)
}

slog.DebugContext(ctx, "Valid ticket, proceeding", slog.Uint64("curIntegratedSize", curIntegratedSize), slog.Uint64("pendingSize", pendingSize))
// Handle 409 Conflicts:
// - Zero-request check: If upload_start == 0 and upload_end == 0, the client is
// requesting initial mirror information.
Expand All @@ -196,6 +238,7 @@ func (mt *MirrorTarget) AddEntries(ctx context.Context, uploadStart, uploadEnd u
(uploadEnd != pendingSize || uploadEnd < curIntegratedSize) ||
(uploadStart > curIntegratedSize) {
// TODO(al): add flexibility about re-writing some entries
slog.ErrorContext(ctx, "Returning conflict", slog.Uint64("curIntegratedSize", curIntegratedSize), slog.Uint64("pendingSize", pendingSize), slog.Uint64("uploadStart", uploadStart), slog.Uint64("uploadEnd", uploadEnd))
return curIntegratedSize, pendingSize, ticketBytes, nil, ErrConflict
}

Expand Down Expand Up @@ -228,7 +271,7 @@ func (mt *MirrorTarget) AddEntries(ctx context.Context, uploadStart, uploadEnd u
return 0, 0, nil, nil, err
case nextEntry == pendingSize:
if !bytes.Equal(pendingRoot, newRoot) {
slog.ErrorContext(ctx, "CORRUPTION DETECTED - pending root != calculated root", slog.String("calculated_root", hex.EncodeToString(newRoot)), slog.String("pending_checkpoint", string(t.PendingCP)))
slog.ErrorContext(ctx, "CORRUPTION DETECTED - pending root != calculated root", slog.String("calculated_root", hex.EncodeToString(newRoot)), slog.String("pending_checkpoint", string(ticketCP)))
return 0, 0, nil, nil, errors.New("internal error")
}
// This is a complete upload.
Expand All @@ -238,8 +281,7 @@ func (mt *MirrorTarget) AddEntries(ctx context.Context, uploadStart, uploadEnd u
// - If published, then return the cosig(s) to the caller.
return nextEntry, pendingSize, nil, []byte("— test cosig\n"), nil
case nextEntry > pendingSize:
// TODO(al): ticket is stale, probably need to update the ticket?
slog.WarnContext(ctx, "nextEntry > pendingSize", slog.Uint64("nextEntry", nextEntry), slog.Uint64("pendingSize", pendingSize))
// Ticket is stale, update the ticket
return nextEntry, pendingSize, ticketBytes, nil, nil
default:
// Incomplete upload, return an updated ticket with the current checkpoint.
Expand All @@ -252,29 +294,24 @@ func (mt *MirrorTarget) IntegratedSize(ctx context.Context) (uint64, error) {
return mt.reader.IntegratedSize(ctx)
}

// ticket is the underlying structure of an add-entries ticket.
type ticket struct {
// PendingCP holds the raw pending checkpoint bytes.
PendingCP []byte
func (mt *MirrorTarget) sealTicket(pendingCP []byte) ([]byte, error) {
h := hmac.New(sha256.New, mt.ticketKey)
h.Write(pendingCP)
mac := h.Sum(nil)
return append(mac, pendingCP...), nil
}

func (mt *MirrorTarget) sealTicket(ctx context.Context, t *ticket) ([]byte, error) {
out := bytes.Buffer{}
if err := gob.NewEncoder(&out).Encode(t); err != nil {
return nil, fmt.Errorf("ticket encoding failed: %v", err)
func (mt *MirrorTarget) openTicket(ticketBytes []byte) ([]byte, error) {
if len(ticketBytes) < sha256.Size {
return nil, errors.New("invalid ticket")
}
// TODO(al): harden ticket & bind to this particular log mirror.
return out.Bytes(), nil
}

func (mt *MirrorTarget) openTicket(ctx context.Context, ticketBytes []byte) (*ticket, error) {
if len(ticketBytes) == 0 {
return nil, errors.New("empty ticket")
}
// TODO(al): harden ticket & verify it's for this particular log mirror.
var t ticket
if err := gob.NewDecoder(bytes.NewReader(ticketBytes)).Decode(&t); err != nil {
return nil, fmt.Errorf("ticket decoding failed: %v", err)
mac, pendingCP := ticketBytes[:sha256.Size], ticketBytes[sha256.Size:]

h := hmac.New(sha256.New, mt.ticketKey)
h.Write(pendingCP)
if !hmac.Equal(mac, h.Sum(nil)) {
return nil, errors.New("invalid ticketMAC")
}
return &t, nil
return pendingCP, nil
}
Loading
Loading