Skip to content

Commit 48ac7b3

Browse files
authored
Allow application to optionally configure storage infra clients (#719)
This PR allows applications using Tessera to (optionally) pass pre-configured instances of infrastructure clients to the drivers, enabling applications to tune those clients to the intended usage patterns and environment (e.g. https://pkg.go.dev/google.golang.org/api/option). In this PR, we also make posix.New take a Config struct, as in the gcp and aws packages to both improve ergonomics when switching between backends, and to open the door to adding future config options to POSIX in a backwards-compatible manner.
1 parent d9fbef2 commit 48ac7b3

11 files changed

Lines changed: 125 additions & 65 deletions

File tree

README_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func constructStorage() {
4848
ctx := context.Background()
4949

5050
// #region construct_example
51-
driver, _ := posix.New(ctx, "/tmp/mylog")
51+
driver, _ := posix.New(ctx, posix.Config{Path: "/tmp/mylog"})
5252
signer := createSigner()
5353

5454
appender, shutdown, reader, err := tessera.NewAppender(
@@ -67,7 +67,7 @@ func constructAndUseAppender() {
6767
ctx := context.Background()
6868
data := []byte("hello")
6969

70-
driver, _ := posix.New(ctx, "/tmp/mylog")
70+
driver, _ := posix.New(ctx, posix.Config{Path: "/tmp/mylog"})
7171
signer := createSigner()
7272

7373
// #region use_appender_example

cmd/conformance/posix/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func main() {
6969
s, a := getSignersOrDie()
7070

7171
// Create the Tessera POSIX storage, using the directory from the --storage_dir flag
72-
driver, err := posix.New(ctx, *storageDir)
72+
driver, err := posix.New(ctx, posix.Config{Path: *storageDir})
7373
if err != nil {
7474
klog.Exitf("Failed to construct storage: %v", err)
7575
}

cmd/examples/posix-oneshot/main.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,9 @@ func main() {
7070
// add all of these leaves without creating any intermediate checkpoints.
7171
driver, err := posix.New(
7272
ctx,
73-
*storageDir,
73+
posix.Config{
74+
Path: *storageDir,
75+
},
7476
)
7577
if err != nil {
7678
klog.Exitf("Failed to construct storage: %v", err)

cmd/experimental/migrate/posix/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func main() {
6363
klog.Exitf("invalid checkpoint roothash %q: %v", bits[2], err)
6464
}
6565

66-
driver, err := posix.New(ctx, *storageDir)
66+
driver, err := posix.New(ctx, posix.Config{Path: *storageDir})
6767
if err != nil {
6868
klog.Exitf("Failed to create new POSIX storage driver: %v", err)
6969
}

internal/witness/witness_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ func TestWitnessGateway_Update(t *testing.T) {
174174

175175
func TestWitness_UpdateRequest(t *testing.T) {
176176
logSignedCheckpoint, _ := loadCheckpoint(t, 9)
177-
d, err := posix.New(context.Background(), "../../testdata/log/")
177+
d, err := posix.New(context.Background(), posix.Config{Path: "../../testdata/log/"})
178178
if err != nil {
179179
t.Fatal(err)
180180
}

storage/aws/aws.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,9 @@ type Config struct {
151151
MaxOpenConns int
152152
// Maximum idle database connections in the connection pool.
153153
MaxIdleConns int
154+
155+
// HTTPClient will be used for other HTTP requests. If unset, Tessera will use the net/http DefaultClient.
156+
HTTPClient *http.Client
154157
}
155158

156159
// New creates a new instance of the AWS based Storage.
@@ -221,7 +224,7 @@ func (s *Storage) newAppender(ctx context.Context, o objStore, seq sequencer, op
221224
logStore: logStore,
222225
sequencer: seq,
223226
queue: storage.NewQueue(ctx, opts.BatchMaxAge(), opts.BatchMaxSize(), seq.assignEntries),
224-
newCP: opts.CheckpointPublisher(logStore, http.DefaultClient),
227+
newCP: opts.CheckpointPublisher(logStore, s.cfg.HTTPClient),
225228
treeUpdated: make(chan struct{}),
226229
}
227230

storage/gcp/gcp.go

Lines changed: 50 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,13 @@ type consumeFunc func(ctx context.Context, from uint64, entries []storage.Sequen
123123

124124
// Config holds GCP project and resource configuration for a storage instance.
125125
type Config struct {
126+
// GCSClient will be used to interact with GCS. If unset, Tessera will create one.
127+
GCSClient *gcs.Client
128+
// SpannerClient will be used to interact with Spanner. If unset, Tessera will create one.
129+
SpannerClient *spanner.Client
130+
// HTTPClient will be used for other HTTP requests. If unset, Tessera will use the net/http DefaultClient.
131+
HTTPClient *http.Client
132+
126133
// Bucket is the name of the GCS bucket to use for storing log state.
127134
Bucket string
128135
// BucketPrefix is an optional prefix to prepend to all log resource paths.
@@ -134,6 +141,9 @@ type Config struct {
134141

135142
// New creates a new instance of the GCP based Storage.
136143
func New(ctx context.Context, cfg Config) (tessera.Driver, error) {
144+
if cfg.HTTPClient == nil {
145+
cfg.HTTPClient = http.DefaultClient
146+
}
137147
return &Storage{
138148
cfg: cfg,
139149
}, nil
@@ -192,17 +202,31 @@ func (lr *LogReader) NextIndex(ctx context.Context) (uint64, error) {
192202

193203
// Appender creates a new tessera.Appender lifecycle object.
194204
func (s *Storage) Appender(ctx context.Context, opts *tessera.AppendOptions) (*tessera.Appender, tessera.LogReader, error) {
195-
c, err := gcs.NewClient(ctx, gcs.WithJSONReads())
196-
if err != nil {
197-
return nil, nil, fmt.Errorf("failed to create GCS client: %v", err)
205+
if s.cfg.GCSClient == nil {
206+
var err error
207+
s.cfg.GCSClient, err = gcs.NewClient(ctx, gcs.WithJSONReads())
208+
if err != nil {
209+
return nil, nil, fmt.Errorf("failed to create GCS client: %v", err)
210+
}
198211
}
199212
gs := &gcsStorage{
200-
gcsClient: c,
213+
gcsClient: s.cfg.GCSClient,
201214
bucket: s.cfg.Bucket,
202215
bucketPrefix: s.cfg.BucketPrefix,
203216
}
204217

205-
seq, err := newSpannerCoordinator(ctx, s.cfg.Spanner, uint64(opts.PushbackMaxOutstanding()))
218+
var err error
219+
if s.cfg.SpannerClient == nil {
220+
s.cfg.SpannerClient, err = spanner.NewClient(ctx, s.cfg.Spanner)
221+
if err != nil {
222+
return nil, nil, fmt.Errorf("failed to connect to Spanner: %v", err)
223+
}
224+
}
225+
if err := initDB(ctx, s.cfg.Spanner); err != nil {
226+
return nil, nil, fmt.Errorf("failed to verify/init Spanner schema: %v", err)
227+
}
228+
229+
seq, err := newSpannerCoordinator(ctx, s.cfg.SpannerClient, uint64(opts.PushbackMaxOutstanding()))
206230
if err != nil {
207231
return nil, nil, fmt.Errorf("failed to create Spanner coordinator: %v", err)
208232
}
@@ -242,7 +266,7 @@ func (s *Storage) newAppender(ctx context.Context, o objStore, seq *spannerCoord
242266
return a.sequencer.nextIndex(ctx)
243267
},
244268
}
245-
a.newCP = opts.CheckpointPublisher(reader, http.DefaultClient)
269+
a.newCP = opts.CheckpointPublisher(reader, s.cfg.HTTPClient)
246270

247271
if err := a.init(ctx); err != nil {
248272
return nil, nil, fmt.Errorf("failed to initialise log storage: %v", err)
@@ -681,18 +705,11 @@ type spannerCoordinator struct {
681705

682706
// newSpannerCoordinator returns a new spannerSequencer struct which uses the provided
683707
// spanner resource name for its spanner connection.
684-
func newSpannerCoordinator(ctx context.Context, spannerDB string, maxOutstanding uint64) (*spannerCoordinator, error) {
685-
dbPool, err := spanner.NewClient(ctx, spannerDB)
686-
if err != nil {
687-
return nil, fmt.Errorf("failed to connect to Spanner: %v", err)
688-
}
708+
func newSpannerCoordinator(ctx context.Context, dbPool *spanner.Client, maxOutstanding uint64) (*spannerCoordinator, error) {
689709
r := &spannerCoordinator{
690710
dbPool: dbPool,
691711
maxOutstanding: maxOutstanding,
692712
}
693-
if err := r.initDB(ctx, spannerDB); err != nil {
694-
return nil, fmt.Errorf("failed to initDB: %v", err)
695-
}
696713
if err := r.checkDataCompatibility(ctx); err != nil {
697714
return nil, fmt.Errorf("schema is not compatible with this version of the Tessera library: %v", err)
698715
}
@@ -712,7 +729,7 @@ func newSpannerCoordinator(ctx context.Context, spannerDB string, maxOutstanding
712729
// - IntCoord
713730
// This table coordinates integration of the batches of entries stored in
714731
// Seq into the committed tree state.
715-
func (s *spannerCoordinator) initDB(ctx context.Context, spannerDB string) error {
732+
func initDB(ctx context.Context, spannerDB string) error {
716733
return createAndPrepareTables(
717734
ctx, spannerDB,
718735
[]string{
@@ -1217,12 +1234,25 @@ func (s *gcsStorage) deleteObjectsWithPrefix(ctx context.Context, objPrefix stri
12171234

12181235
// MigrationWriter creates a new GCP storage for the MigrationTarget lifecycle mode.
12191236
func (s *Storage) MigrationWriter(ctx context.Context, opts *tessera.MigrationOptions) (migrate.MigrationWriter, tessera.LogReader, error) {
1220-
c, err := gcs.NewClient(ctx, gcs.WithJSONReads())
1221-
if err != nil {
1222-
return nil, nil, fmt.Errorf("failed to create GCS client: %v", err)
1237+
var err error
1238+
if s.cfg.GCSClient == nil {
1239+
s.cfg.GCSClient, err = gcs.NewClient(ctx, gcs.WithJSONReads())
1240+
if err != nil {
1241+
return nil, nil, fmt.Errorf("failed to create GCS client: %v", err)
1242+
}
1243+
}
1244+
1245+
if s.cfg.SpannerClient == nil {
1246+
s.cfg.SpannerClient, err = spanner.NewClient(ctx, s.cfg.Spanner)
1247+
if err != nil {
1248+
return nil, nil, fmt.Errorf("failed to connect to Spanner: %v", err)
1249+
}
1250+
}
1251+
if err := initDB(ctx, s.cfg.Spanner); err != nil {
1252+
return nil, nil, fmt.Errorf("failed to verify/init Spanner schema: %v", err)
12231253
}
12241254

1225-
seq, err := newSpannerCoordinator(ctx, s.cfg.Spanner, 0)
1255+
seq, err := newSpannerCoordinator(ctx, s.cfg.SpannerClient, 0)
12261256
if err != nil {
12271257
return nil, nil, fmt.Errorf("failed to create Spanner sequencer: %v", err)
12281258
}
@@ -1233,7 +1263,7 @@ func (s *Storage) MigrationWriter(ctx context.Context, opts *tessera.MigrationOp
12331263
sequencer: seq,
12341264
logStore: &logResourceStore{
12351265
objStore: &gcsStorage{
1236-
gcsClient: c,
1266+
gcsClient: s.cfg.GCSClient,
12371267
bucket: s.cfg.Bucket,
12381268
bucketPrefix: s.cfg.BucketPrefix,
12391269
},

storage/gcp/gcp_test.go

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import (
4141
"golang.org/x/mod/sumdb/note"
4242
)
4343

44-
func newSpannerDB(t *testing.T) func() {
44+
func newSpannerDB(t *testing.T) (*spanner.Client, func()) {
4545
t.Helper()
4646
srv, err := spannertest.NewServer("localhost:0")
4747
if err != nil {
@@ -50,16 +50,26 @@ func newSpannerDB(t *testing.T) func() {
5050
if err := os.Setenv("SPANNER_EMULATOR_HOST", srv.Addr); err != nil {
5151
t.Fatalf("Setenv: %v", err)
5252
}
53-
return srv.Close
53+
54+
id := "projects/p/instances/i/databases/d"
55+
if err := initDB(t.Context(), id); err != nil {
56+
t.Fatalf("initDB: %v", err)
57+
}
58+
59+
c, err := spanner.NewClient(t.Context(), id)
60+
if err != nil {
61+
t.Fatalf("NewClient: %v", err)
62+
}
63+
return c, srv.Close
5464

5565
}
5666

5767
func TestSpannerSequencerAssignEntries(t *testing.T) {
5868
ctx := context.Background()
59-
close := newSpannerDB(t)
69+
db, close := newSpannerDB(t)
6070
defer close()
6171

62-
seq, err := newSpannerCoordinator(ctx, "projects/p/instances/i/databases/d", 1000)
72+
seq, err := newSpannerCoordinator(ctx, db, 1000)
6373
if err != nil {
6474
t.Fatalf("newSpannerCoordinator: %v", err)
6575
}
@@ -109,10 +119,10 @@ func TestSpannerSequencerPushback(t *testing.T) {
109119
},
110120
} {
111121
t.Run(test.name, func(t *testing.T) {
112-
close := newSpannerDB(t)
122+
db, close := newSpannerDB(t)
113123
defer close()
114124

115-
seq, err := newSpannerCoordinator(ctx, "projects/p/instances/i/databases/d", test.threshold)
125+
seq, err := newSpannerCoordinator(ctx, db, test.threshold)
116126
if err != nil {
117127
t.Fatalf("newSpannerCoordinator: %v", err)
118128
}
@@ -139,10 +149,10 @@ func TestSpannerSequencerPushback(t *testing.T) {
139149

140150
func TestSpannerSequencerRoundTrip(t *testing.T) {
141151
ctx := context.Background()
142-
close := newSpannerDB(t)
152+
db, close := newSpannerDB(t)
143153
defer close()
144154

145-
s, err := newSpannerCoordinator(ctx, "projects/p/instances/i/databases/d", 1000)
155+
s, err := newSpannerCoordinator(ctx, db, 1000)
146156
if err != nil {
147157
t.Fatalf("newSpannerCoordinator: %v", err)
148158
}
@@ -191,10 +201,10 @@ func TestSpannerSequencerRoundTrip(t *testing.T) {
191201

192202
func TestCheckDataCompatibility(t *testing.T) {
193203
ctx := context.Background()
194-
close := newSpannerDB(t)
204+
db, close := newSpannerDB(t)
195205
defer close()
196206

197-
s, err := newSpannerCoordinator(ctx, "projects/p/instances/i/databases/d", 1000)
207+
s, err := newSpannerCoordinator(ctx, db, 1000)
198208
if err != nil {
199209
t.Fatalf("newSpannerCoordinator: %v", err)
200210
}
@@ -381,9 +391,9 @@ func TestPublishTree(t *testing.T) {
381391
},
382392
} {
383393
t.Run(test.name, func(t *testing.T) {
384-
closeDB := newSpannerDB(t)
394+
db, closeDB := newSpannerDB(t)
385395
defer closeDB()
386-
s, err := newSpannerCoordinator(ctx, "projects/p/instances/i/databases/d", 1000)
396+
s, err := newSpannerCoordinator(ctx, db, 1000)
387397
if err != nil {
388398
t.Fatalf("newSpannerCoordinator: %v", err)
389399
}
@@ -438,10 +448,10 @@ func TestGarbageCollect(t *testing.T) {
438448
batchSize := uint64(60000)
439449
integrateEvery := uint64(31234)
440450

441-
closeDB := newSpannerDB(t)
451+
db, closeDB := newSpannerDB(t)
442452
defer closeDB()
443453

444-
s, err := newSpannerCoordinator(ctx, "projects/p/instances/i/databases/d", batchSize)
454+
s, err := newSpannerCoordinator(ctx, db, batchSize)
445455
if err != nil {
446456
t.Fatalf("newSpannerCoordinator: %v", err)
447457
}

0 commit comments

Comments
 (0)