Skip to content

Commit 20980b0

Browse files
authored
Add POSIX mirror lifecycle outline & witness support (#989)
Add a skeleton POSIX storage implementation for the mirror lifecycle, and connect it up to the POSIX mirror binary.
1 parent 9dc3294 commit 20980b0

2 files changed

Lines changed: 142 additions & 22 deletions

File tree

cmd/mtc/mirror/posix/main.go

Lines changed: 57 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,17 @@ import (
1919
"encoding/json"
2020
"errors"
2121
"flag"
22+
"fmt"
2223
"log/slog"
2324
"net/http"
2425
"os"
2526
"path/filepath"
27+
"net/url"
2628

2729
fnote "github.com/transparency-dev/formats/note"
2830
"github.com/transparency-dev/tessera"
2931
"github.com/transparency-dev/tessera/cmd/mtc/mirror/internal/config"
32+
"github.com/transparency-dev/tessera/storage/posix"
3033
"github.com/transparency-dev/tessera/cmd/mtc/mirror/internal/handler"
3134
"github.com/transparency-dev/witness/omniwitness"
3235
"github.com/transparency-dev/witness/witness"
@@ -41,7 +44,8 @@ const (
4144
var (
4245
listenAddr = flag.String("listen_addr", ":8080", "The address to listen on for HTTP requests.")
4346
storageDir = flag.String("storage_dir", "", "Directory to store mirror data.")
44-
witnessSignerPath = flag.String("witness_signer_path", "", "The path to the note-formatted witness signer secret key.")
47+
witnessCosignerPath = flag.String("witness_cosigner_path", "", "The path to the note-formatted witness cosigner secret key.")
48+
mirrorCosignerPath = flag.String("mirror_cosigner_path", "", "The path to the note-formatted mirror cosigner secret key.")
4549
mirrorConfigPath = flag.String("config_path", "", "The path to the mirror configuration file.")
4650
)
4751

@@ -57,6 +61,8 @@ func main() {
5761
}
5862
}()
5963

64+
mirrorCosigner := mustCreateCosigner(ctx, *mirrorCosignerPath)
65+
6066
mux := handler.NewMirrorMux()
6167
cfg := mirrorConfigFromFlags(ctx)
6268
for _, l := range cfg.Logs {
@@ -65,11 +71,21 @@ func main() {
6571
slog.ErrorContext(ctx, "Failed to create verifier", slog.String("vkey", l.VKey), slog.Any("error", err))
6672
os.Exit(1)
6773
}
74+
6875
origin := v.Name()
69-
if err := mux.AddTarget(origin, &fakeTarget{}); err != nil {
70-
slog.ErrorContext(ctx, "Failed to add target", slog.String("origin", origin), slog.Any("error", err))
76+
77+
// Create the mirror
78+
t, err := newMirrorTarget(ctx, w, origin, mirrorCosigner)
79+
if err != nil {
80+
slog.ErrorContext(ctx, "Failed to create mirror target", slog.String("origin", origin), slog.Any("error", err))
81+
os.Exit(1)
82+
}
83+
if err := mux.AddTarget(origin, t); err != nil {
84+
slog.ErrorContext(ctx, "Failed to add target to mux", slog.String("origin", origin), slog.Any("error", err))
7185
os.Exit(1)
7286
}
87+
88+
// Ensure log is known by the witness
7389
if err := wp.AddLogs(ctx, []omniwitness.Log{
7490
{Origin: origin, VKey: l.VKey},
7591
}); err != nil {
@@ -87,6 +103,29 @@ func main() {
87103
}
88104
}
89105

106+
// newMirrorTarget creates a new POSIX driver and MirrorTarget for the given origin.
107+
//
108+
// The target directory for the driver is derived from the storage directory and the origin in accordance
109+
// with the `tlog-mirror` spec, allowing the root of the storage directory to be exported directly to read-only clients.
110+
func newMirrorTarget(ctx context.Context, w *witness.Witness, origin string, mirrorSigner note.Signer) (*tessera.MirrorTarget, error) {
111+
targetDir := filepath.Join(*storageDir, url.PathEscape(origin))
112+
if err := os.MkdirAll(targetDir, 0o755); err != nil {
113+
return nil, fmt.Errorf("mkdir %q: %v", targetDir, err)
114+
}
115+
d, err := posix.New(ctx, posix.Config{Path: targetDir})
116+
if err != nil {
117+
return nil, fmt.Errorf("posix.New: %v", err)
118+
}
119+
mOpts := tessera.NewMirrorOptions().
120+
WithCheckpointSource(func(ctx context.Context) ([]byte, error) {
121+
return w.GetCheckpoint(ctx, origin)
122+
}).
123+
WithSigner(mirrorSigner)
124+
return tessera.NewMirrorTarget(ctx, d, mOpts)
125+
}
126+
127+
// mirrorConfigFromFlags returns a mirror configuration loaded from the provided flags.
128+
// Exits if the mirror configuration could not be loaded.
90129
func mirrorConfigFromFlags(ctx context.Context) config.Config {
91130
if *mirrorConfigPath == "" {
92131
slog.ErrorContext(ctx, "Mirror config path not specified")
@@ -131,7 +170,7 @@ func witnessFromFlags(ctx context.Context) (*witness.Witness, *sqlite.Persistenc
131170

132171
w, err := witness.New(ctx, witness.Opts{
133172
Persistence: p,
134-
Signers: witnessSignerFromFlags(ctx),
173+
Signers: witnessCosignerFromFlags(ctx),
135174
VerifierForLog: func(ctx context.Context, origin string) (note.Verifier, bool, error) {
136175
log, ok, err := p.Log(ctx, origin)
137176
if err != nil || !ok {
@@ -147,30 +186,30 @@ func witnessFromFlags(ctx context.Context) (*witness.Witness, *sqlite.Persistenc
147186
return w, p, shutdown
148187
}
149188

150-
// fakeTarget is a temporary mirror target impl, and will be removed in due course.
151-
type fakeTarget struct {}
152-
153-
func (f fakeTarget) AddEntries(ctx context.Context, uploadStart, uploadEnd uint64, ticket []byte, next func() (*tessera.MirrorPackage, error)) (nextIdx uint64, curSize uint64, newTicket []byte, cosigs []byte, err error) {
154-
slog.InfoContext(ctx, "fake target: AddEntries", slog.Uint64("uploadStart", uploadStart), slog.Uint64("uploadEnd", uploadEnd))
155-
return uploadEnd, 0, nil, nil, nil
156-
}
157-
158-
func witnessSignerFromFlags(ctx context.Context) []note.Signer {
159-
if *witnessSignerPath == "" {
189+
func witnessCosignerFromFlags(ctx context.Context) []note.Signer {
190+
if *witnessCosignerPath == "" {
160191
slog.WarnContext(ctx, "Witness cosigner not configured, add-checkpoint will not return cosigs")
161192
return []note.Signer{}
162193
}
163-
r, err := os.ReadFile(*witnessSignerPath)
194+
return []note.Signer{mustCreateCosigner(ctx, *witnessCosignerPath)}
195+
}
196+
197+
func mustCreateCosigner(ctx context.Context, path string) note.Signer {
198+
if path == "" {
199+
slog.ErrorContext(ctx, "Cosigner key path not specified")
200+
os.Exit(1)
201+
}
202+
r, err := os.ReadFile(path)
164203
if err != nil {
165-
slog.ErrorContext(ctx, "Failed to read witness cosigner file", slog.String("path", *witnessSignerPath), slog.Any("error", err))
204+
slog.ErrorContext(ctx, "Failed to read cosigner key", slog.String("path", path), slog.Any("error", err))
166205
os.Exit(1)
167206
}
168207
s, err := fnote.NewSignerForCosignatureV1(string(r))
169208
if err != nil {
170-
slog.ErrorContext(ctx, "Failed to create cosigner", slog.String("path", *witnessSignerPath), slog.Any("error", err))
209+
slog.ErrorContext(ctx, "Failed to create cosigner", slog.String("path", path), slog.Any("error", err))
171210
os.Exit(1)
172211
}
173-
return []note.Signer{s}
212+
return s
174213
}
175214

176215

storage/posix/files.go

Lines changed: 85 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"errors"
2121
"fmt"
2222
"io/fs"
23+
"iter"
2324
"net/http"
2425
"os"
2526
"path/filepath"
@@ -313,6 +314,7 @@ func (a *appender) sequenceBatch(ctx context.Context, entries []*tessera.Entry)
313314
// - The mutex `Lock()` ensures that multiple concurrent calls to this function within a task are serialised.
314315
// - The POSIX `lockFile()` ensures that distinct tasks are serialised.
315316
a.s.mu.Lock()
317+
defer a.s.mu.Unlock()
316318
unlock, err := a.s.lockFile(ctx, treeStateLock)
317319
if err != nil {
318320
panic(err)
@@ -321,7 +323,6 @@ func (a *appender) sequenceBatch(ctx context.Context, entries []*tessera.Entry)
321323
if err := unlock(); err != nil {
322324
panic(err)
323325
}
324-
a.s.mu.Unlock()
325326
}()
326327

327328
size, _, err := a.s.readTreeState(ctx)
@@ -538,6 +539,7 @@ func (a *appender) initialise(ctx context.Context) error {
538539
// - The mutex `Lock()` ensures that multiple concurrent calls to this function within a task are serialised.
539540
// - The POSIX `lockFile()` ensures that distinct tasks are serialised.
540541
a.s.mu.Lock()
542+
defer a.s.mu.Unlock()
541543
unlock, err := a.s.lockFile(ctx, treeStateLock)
542544
if err != nil {
543545
panic(err)
@@ -546,7 +548,6 @@ func (a *appender) initialise(ctx context.Context) error {
546548
if err := unlock(); err != nil {
547549
panic(err)
548550
}
549-
a.s.mu.Unlock()
550551
}()
551552

552553
if err := a.s.ensureVersion(compatibilityVersion); err != nil {
@@ -996,6 +997,7 @@ func (m *MigrationStorage) initialise(ctx context.Context) error {
996997
// - The mutex `Lock()` ensures that multiple concurrent calls to this function within a task are serialised.
997998
// - The POSIX `lockFile()` ensures that distinct tasks are serialised.
998999
m.s.mu.Lock()
1000+
defer m.s.mu.Unlock()
9991001
unlock, err := m.s.lockFile(ctx, treeStateLock)
10001002
if err != nil {
10011003
panic(err)
@@ -1004,7 +1006,6 @@ func (m *MigrationStorage) initialise(ctx context.Context) error {
10041006
if err := unlock(); err != nil {
10051007
panic(err)
10061008
}
1007-
m.s.mu.Unlock()
10081009
}()
10091010

10101011
if err := m.s.ensureVersion(compatibilityVersion); err != nil {
@@ -1041,6 +1042,7 @@ func (m *MigrationStorage) buildTree(ctx context.Context, targetSize uint64) err
10411042
// - The mutex `Lock()` ensures that multiple concurrent calls to this function within a task are serialised.
10421043
// - The POSIX `lockFile()` ensures that distinct tasks are serialised.
10431044
m.s.mu.Lock()
1045+
defer m.s.mu.Unlock()
10441046
unlock, err := m.s.lockFile(ctx, treeStateLock)
10451047
if err != nil {
10461048
panic(err)
@@ -1049,7 +1051,6 @@ func (m *MigrationStorage) buildTree(ctx context.Context, targetSize uint64) err
10491051
if err := unlock(); err != nil {
10501052
panic(err)
10511053
}
1052-
m.s.mu.Unlock()
10531054
}()
10541055

10551056
size, _, err := m.s.readTreeState(ctx)
@@ -1110,3 +1111,83 @@ func (m *MigrationStorage) fetchLeafHashes(ctx context.Context, from, to, source
11101111
}
11111112
return lh, nil
11121113
}
1114+
1115+
1116+
// MirrorWriter creates a new POSIX storage for the MirrorTarget lifecycle mode.
1117+
func (s *Storage) MirrorWriter(ctx context.Context, opts *tessera.MirrorOptions) (tessera.MirrorWriter, tessera.LogReader, error) {
1118+
r := &MirrorWriter{
1119+
s: s,
1120+
logStorage: &logResourceStorage{
1121+
entriesPath: opts.EntriesPath(),
1122+
s: s,
1123+
},
1124+
bundleHasher: opts.LeafHasher(),
1125+
}
1126+
if err := r.initialise(ctx); err != nil {
1127+
return nil, nil, err
1128+
}
1129+
return r, r.logStorage, nil
1130+
}
1131+
1132+
// MirrorWriter implements the tessera.MirrorWriter lifecycle contract.
1133+
type MirrorWriter struct {
1134+
s *Storage
1135+
logStorage *logResourceStorage
1136+
bundleHasher func(entryBundle []byte) ([][]byte, error)
1137+
curSize uint64
1138+
}
1139+
1140+
var _ tessera.MirrorWriter = &MirrorWriter{}
1141+
1142+
func (m *MirrorWriter) initialise(ctx context.Context) error {
1143+
// Idempotent: If folder exists, nothing happens.
1144+
if err := mkdirAll(filepath.Join(m.s.cfg.Path, stateDir), dirPerm); err != nil {
1145+
return fmt.Errorf("failed to create log directory: %q", err)
1146+
}
1147+
// Double locking:
1148+
// - The mutex `Lock()` ensures that multiple concurrent calls to this function within a task are serialised.
1149+
// - The POSIX `lockFile()` ensures that distinct tasks are serialised.
1150+
m.s.mu.Lock()
1151+
defer m.s.mu.Unlock()
1152+
unlock, err := m.s.lockFile(ctx, treeStateLock)
1153+
if err != nil {
1154+
// Oh dear, panic Mr Mainwaring!
1155+
panic(fmt.Errorf("failed to lock tree state: %v", err))
1156+
}
1157+
defer func() {
1158+
if err := unlock(); err != nil {
1159+
// Oh dear, panic Mr Mainwaring!
1160+
panic(fmt.Errorf("failed to unlock tree state: %v", err))
1161+
}
1162+
}()
1163+
1164+
if err := m.s.ensureVersion(compatibilityVersion); err != nil {
1165+
return err
1166+
}
1167+
curSize, _, err := m.s.readTreeState(ctx)
1168+
if err != nil {
1169+
if !errors.Is(err, os.ErrNotExist) {
1170+
return fmt.Errorf("failed to load checkpoint for mirror: %v", err)
1171+
}
1172+
// Create the directory structure and write out an empty checkpoint
1173+
slog.InfoContext(ctx, "Initializing directory for POSIX mirror (this should only happen ONCE per mirror!)", slog.String("path", m.s.cfg.Path))
1174+
if err := m.s.writeTreeState(ctx, 0, rfc6962.DefaultHasher.EmptyRoot()); err != nil {
1175+
return fmt.Errorf("failed to write tree-state checkpoint: %v", err)
1176+
}
1177+
return nil
1178+
}
1179+
m.curSize = curSize
1180+
1181+
return nil
1182+
}
1183+
1184+
// IntegrateBundles integrates a sequence of entry bundles into the tree, starting at the provided bundle index bundleIdx.
1185+
// Returns the new size and root hash of the tree if successful.
1186+
func (m *MirrorWriter) IntegrateBundles(ctx context.Context, bundleIdx uint64, bundles iter.Seq[api.EntryBundle]) (uint64, []byte, error) {
1187+
return 0, nil, errors.New("unimplemented")
1188+
}
1189+
1190+
func (m *MirrorWriter) IntegratedSize(ctx context.Context) (uint64, error) {
1191+
sz, _, err := m.s.readTreeState(ctx)
1192+
return sz, err
1193+
}

0 commit comments

Comments
 (0)