Skip to content

Commit 53e358f

Browse files
authored
[GCP] Support WithCheckpointRepublishInterval (#812)
Adds support to GCP for the WithCheckpointRepublishInterval option added in #807.
1 parent 85f0a27 commit 53e358f

2 files changed

Lines changed: 99 additions & 45 deletions

File tree

storage/gcp/gcp.go

Lines changed: 65 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ type sequencer interface {
112112
// nextIndex returns the next available index in the log.
113113
nextIndex(ctx context.Context) (uint64, error)
114114
// publishCheckpoint coordinates the publication of new checkpoints based on the current integrated tree.
115-
publishCheckpoint(ctx context.Context, minAge time.Duration, f func(ctx context.Context, size uint64, root []byte) error) error
115+
publishCheckpoint(ctx context.Context, minStaleActive, minStaleRepub time.Duration, f func(ctx context.Context, size uint64, root []byte) error) error
116116
// garbageCollect coordinates the removal of unneeded partial tiles/entry bundles for the provided tree size, up to a maximum number of deletes per invocation.
117117
garbageCollect(ctx context.Context, treeSize uint64, maxDeletes uint, removePrefix func(ctx context.Context, prefix string) error) error
118118
}
@@ -274,7 +274,7 @@ func (s *Storage) newAppender(ctx context.Context, o objStore, seq *spannerCoord
274274
}
275275

276276
go a.integrateEntriesJob(ctx)
277-
go a.publishCheckpointJob(ctx, opts.CheckpointInterval())
277+
go a.publishCheckpointJob(ctx, opts.CheckpointInterval(), opts.CheckpointRepublishInterval())
278278
if i := opts.GarbageCollectionInterval(); i > 0 {
279279
go a.garbageCollectorJob(ctx, i)
280280
}
@@ -339,8 +339,8 @@ func (a *Appender) integrateEntriesJob(ctx context.Context) {
339339
// of the tree, once per interval.
340340
//
341341
// Blocks until ctx is done.
342-
func (a *Appender) publishCheckpointJob(ctx context.Context, i time.Duration) {
343-
t := time.NewTicker(i)
342+
func (a *Appender) publishCheckpointJob(ctx context.Context, pubInterval, republishInterval time.Duration) {
343+
t := time.NewTicker(pubInterval)
344344
defer t.Stop()
345345
for {
346346
select {
@@ -352,7 +352,8 @@ func (a *Appender) publishCheckpointJob(ctx context.Context, i time.Duration) {
352352
func() {
353353
ctx, span := tracer.Start(ctx, "tessera.storage.gcp.publishCheckpointJob")
354354
defer span.End()
355-
if err := a.sequencer.publishCheckpoint(ctx, i, a.publishCheckpoint); err != nil {
355+
356+
if err := a.sequencer.publishCheckpoint(ctx, pubInterval, republishInterval, a.publishCheckpoint); err != nil {
356357
klog.Warningf("publishCheckpoint failed: %v", err)
357358
}
358359
}()
@@ -731,24 +732,25 @@ func newSpannerCoordinator(ctx context.Context, dbPool *spanner.Client, maxOutst
731732
// This table coordinates integration of the batches of entries stored in
732733
// Seq into the committed tree state.
733734
func initDB(ctx context.Context, spannerDB string) error {
734-
return createAndPrepareTables(
735-
ctx, spannerDB,
735+
return createAndPrepareTables(ctx, spannerDB,
736736
[]string{
737737
"CREATE TABLE IF NOT EXISTS Tessera (id INT64 NOT NULL, compatibilityVersion INT64 NOT NULL) PRIMARY KEY (id)",
738738
"CREATE TABLE IF NOT EXISTS SeqCoord (id INT64 NOT NULL, next INT64 NOT NULL,) PRIMARY KEY (id)",
739739
"CREATE TABLE IF NOT EXISTS Seq (id INT64 NOT NULL, seq INT64 NOT NULL, v BYTES(MAX),) PRIMARY KEY (id, seq)",
740740
"CREATE TABLE IF NOT EXISTS IntCoord (id INT64 NOT NULL, seq INT64 NOT NULL, rootHash BYTES(32)) PRIMARY KEY (id)",
741-
"CREATE TABLE IF NOT EXISTS PubCoord (id INT64 NOT NULL, publishedAt TIMESTAMP NOT NULL) PRIMARY KEY (id)",
741+
"CREATE TABLE IF NOT EXISTS PubCoord (id INT64 NOT NULL, publishedAt TIMESTAMP NOT NULL, size INT64) PRIMARY KEY (id)",
742742
"CREATE TABLE IF NOT EXISTS GCCoord (id INT64 NOT NULL, fromSize INT64 NOT NULL) PRIMARY KEY (id)",
743743
},
744+
[]string{
745+
"ALTER TABLE PubCoord ADD COLUMN IF NOT EXISTS size INT64",
746+
},
744747
[][]*spanner.Mutation{
745748
{spanner.Insert("Tessera", []string{"id", "compatibilityVersion"}, []any{0, SchemaCompatibilityVersion})},
746749
{spanner.Insert("SeqCoord", []string{"id", "next"}, []any{0, 0})},
747750
{spanner.Insert("IntCoord", []string{"id", "seq", "rootHash"}, []any{0, 0, rfc6962.DefaultHasher.EmptyRoot()})},
748-
{spanner.Insert("PubCoord", []string{"id", "publishedAt"}, []any{0, time.Unix(0, 0)})},
751+
{spanner.Insert("PubCoord", []string{"id", "publishedAt", "size"}, []any{0, time.Unix(0, 0), 0})},
749752
{spanner.Insert("GCCoord", []string{"id", "fromSize"}, []any{0, 0})},
750-
},
751-
)
753+
})
752754
}
753755

754756
// checkDataCompatibility compares the Tessera library SchemaCompatibilityVersion with the one stored in the
@@ -994,30 +996,34 @@ func (s *spannerCoordinator) nextIndex(ctx context.Context) (uint64, error) {
994996
return uint64(nextSeq), nil
995997
}
996998

997-
// publishCheckpoint checks when the last checkpoint was published, and if it was more than minAge ago, calls the provided
999+
// publishCheckpoint checks when the last checkpoint was published, and if appropriate, calls the provided
9981000
// function to publish a new one.
9991001
//
1002+
// A checkpoint will not be published if either:
1003+
// - the currently published checkpoint was published less than minStaleActive ago
1004+
// - the new checkpoint is the same size as the currently published one, AND the currently published checkpoint
1005+
// was published less than minStaleRepub ago.
1006+
//
10001007
// This function uses PubCoord with an exclusive lock to guarantee that only one tessera instance can attempt to publish
10011008
// a checkpoint at any given time.
1002-
func (s *spannerCoordinator) publishCheckpoint(ctx context.Context, minAge time.Duration, f func(context.Context, uint64, []byte) error) error {
1009+
func (s *spannerCoordinator) publishCheckpoint(ctx context.Context, minStaleActive, minStaleRepub time.Duration, f func(context.Context, uint64, []byte) error) error {
10031010
if _, err := s.dbPool.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
1004-
pRow, err := txn.ReadRowWithOptions(ctx, "PubCoord", spanner.Key{0}, []string{"publishedAt"}, &spanner.ReadOptions{LockHint: spannerpb.ReadRequest_LOCK_HINT_EXCLUSIVE})
1011+
pRow, err := txn.ReadRowWithOptions(ctx, "PubCoord", spanner.Key{0}, []string{"publishedAt", "size"}, &spanner.ReadOptions{LockHint: spannerpb.ReadRequest_LOCK_HINT_EXCLUSIVE})
10051012
if err != nil {
10061013
return fmt.Errorf("failed to read PubCoord: %w", err)
10071014
}
10081015
var pubAt time.Time
1009-
if err := pRow.Column(0, &pubAt); err != nil {
1010-
return fmt.Errorf("failed to parse publishedAt: %v", err)
1016+
var lastSize spanner.NullInt64
1017+
if err := pRow.Columns(&pubAt, &lastSize); err != nil {
1018+
return fmt.Errorf("failed to parse PubCoord: %v", err)
10111019
}
10121020

10131021
cpAge := time.Since(pubAt)
1014-
if cpAge < minAge {
1015-
klog.V(1).Infof("publishCheckpoint: last checkpoint published %s ago (< required %s), not publishing new checkpoint", cpAge, minAge)
1022+
if cpAge < minStaleActive {
1023+
klog.V(1).Infof("publishCheckpoint: last checkpoint published %s ago (< required %s), not publishing new checkpoint", cpAge, minStaleActive)
10161024
return nil
10171025
}
10181026

1019-
klog.V(1).Infof("publishCheckpoint: updating checkpoint (replacing %s old checkpoint)", cpAge)
1020-
10211027
// Can't just use currentTree() here as the spanner emulator doesn't do nested transactions, so do it manually:
10221028
row, err := txn.ReadRow(ctx, "IntCoord", spanner.Key{0}, []string{"seq", "rootHash"})
10231029
if err != nil {
@@ -1028,10 +1034,29 @@ func (s *spannerCoordinator) publishCheckpoint(ctx context.Context, minAge time.
10281034
if err := row.Columns(&fromSeq, &rootHash); err != nil {
10291035
return fmt.Errorf("failed to parse integration coordination info: %v", err)
10301036
}
1031-
if err := f(ctx, uint64(fromSeq), rootHash); err != nil {
1037+
1038+
currentSize := uint64(fromSeq)
1039+
shouldPublish := minStaleRepub > 0 && cpAge >= minStaleRepub
1040+
if !shouldPublish {
1041+
if !lastSize.Valid {
1042+
// If we don't know the last published size, we should probably publish to be safe/self-heal.
1043+
shouldPublish = true
1044+
} else if currentSize > uint64(lastSize.Int64) {
1045+
shouldPublish = true
1046+
}
1047+
}
1048+
1049+
if !shouldPublish {
1050+
klog.V(1).Infof("publishCheckpoint: skipping publish because tree hasn't grown and previous checkpoint is too recent")
1051+
return nil
1052+
}
1053+
1054+
klog.V(1).Infof("publishCheckpoint: updating checkpoint (replacing %s old checkpoint)", cpAge)
1055+
1056+
if err := f(ctx, currentSize, rootHash); err != nil {
10321057
return err
10331058
}
1034-
if err := txn.BufferWrite([]*spanner.Mutation{spanner.Update("PubCoord", []string{"id", "publishedAt"}, []any{0, time.Now()})}); err != nil {
1059+
if err := txn.BufferWrite([]*spanner.Mutation{spanner.Update("PubCoord", []string{"id", "publishedAt", "size"}, []any{0, time.Now(), int64(currentSize)})}); err != nil {
10351060
return err
10361061
}
10371062

@@ -1440,7 +1465,7 @@ func (m *MigrationStorage) buildTree(ctx context.Context, sourceSize uint64) (ui
14401465
// This is intended to be used to create and initialise Spanner instances on first use.
14411466
// DDL should likely be of the form "CREATE TABLE IF NOT EXISTS".
14421467
// Mutation groups should likey be one or more spanner.Insert operations - AlreadyExists errors will be silently ignored.
1443-
func createAndPrepareTables(ctx context.Context, spannerDB string, ddl []string, mutations [][]*spanner.Mutation) error {
1468+
func createAndPrepareTables(ctx context.Context, spannerDB string, ddl []string, alter []string, mutations [][]*spanner.Mutation) error {
14441469
adminClient, err := database.NewDatabaseAdminClient(ctx)
14451470
if err != nil {
14461471
return err
@@ -1462,6 +1487,23 @@ func createAndPrepareTables(ctx context.Context, spannerDB string, ddl []string,
14621487
return err
14631488
}
14641489

1490+
if len(alter) > 0 {
1491+
// The spannertest emulator appears to ignore IF NOT EXISTS in ALTER DATABASE statements, so
1492+
// we'll apply each update individually and ignore any AlreadyExists errors we see.
1493+
for _, a := range alter {
1494+
op, err := adminClient.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
1495+
Database: spannerDB,
1496+
Statements: []string{a},
1497+
})
1498+
if err != nil { // && spanner.ErrCode(err) != codes.AlreadyExists {
1499+
return fmt.Errorf("updateDatabaseDdl: %v", err)
1500+
}
1501+
if err := op.Wait(ctx); err != nil && spanner.ErrCode(err) != codes.AlreadyExists {
1502+
return fmt.Errorf("failed to alter table: %v", err)
1503+
}
1504+
}
1505+
}
1506+
14651507
dbPool, err := spanner.NewClient(ctx, spannerDB)
14661508
if err != nil {
14671509
return fmt.Errorf("failed to connect to Spanner: %v", err)

storage/gcp/gcp_test.go

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -363,31 +363,42 @@ func TestBundleRoundtrip(t *testing.T) {
363363
func TestPublishTree(t *testing.T) {
364364
ctx := context.Background()
365365
for _, test := range []struct {
366-
name string
367-
publishInterval time.Duration
368-
attempts []time.Duration
369-
wantUpdates int
366+
name string
367+
publishInterval time.Duration
368+
republishInterval time.Duration
369+
attempts []time.Duration
370+
wantUpdates int
370371
}{
371372
{
372-
name: "works ok",
373-
publishInterval: 100 * time.Millisecond,
374-
attempts: []time.Duration{1 * time.Second},
375-
wantUpdates: 1,
373+
name: "works ok",
374+
publishInterval: 100 * time.Millisecond,
375+
republishInterval: 100 * time.Millisecond,
376+
attempts: []time.Duration{1 * time.Second},
377+
wantUpdates: 1,
376378
}, {
377-
name: "too soon, skip update",
378-
publishInterval: 10 * time.Second,
379-
attempts: []time.Duration{100 * time.Millisecond},
380-
wantUpdates: 0,
379+
name: "too soon, skip update",
380+
publishInterval: 10 * time.Second,
381+
republishInterval: 10 * time.Second,
382+
attempts: []time.Duration{100 * time.Millisecond},
383+
wantUpdates: 0,
381384
}, {
382-
name: "too soon, skip update, but recovers",
383-
publishInterval: 2 * time.Second,
384-
attempts: []time.Duration{100 * time.Millisecond, 2 * time.Second},
385-
wantUpdates: 1,
385+
name: "too soon, skip update, but recovers",
386+
publishInterval: 2 * time.Second,
387+
republishInterval: 2 * time.Second,
388+
attempts: []time.Duration{100 * time.Millisecond, 2 * time.Second},
389+
wantUpdates: 1,
386390
}, {
387-
name: "many attempts, eventually one succeeds",
388-
publishInterval: 1 * time.Second,
389-
attempts: []time.Duration{300 * time.Millisecond, 300 * time.Millisecond, 300 * time.Millisecond, 300 * time.Millisecond},
390-
wantUpdates: 1,
391+
name: "many attempts, eventually one succeeds",
392+
publishInterval: 1 * time.Second,
393+
republishInterval: 1 * time.Second,
394+
attempts: []time.Duration{300 * time.Millisecond, 300 * time.Millisecond, 300 * time.Millisecond, 300 * time.Millisecond},
395+
wantUpdates: 1,
396+
}, {
397+
name: "republish needed",
398+
publishInterval: 1 * time.Second,
399+
republishInterval: 2 * time.Second,
400+
attempts: []time.Duration{1500 * time.Millisecond, 2500 * time.Millisecond},
401+
wantUpdates: 1,
391402
},
392403
} {
393404
t.Run(test.name, func(t *testing.T) {
@@ -414,7 +425,8 @@ func TestPublishTree(t *testing.T) {
414425
if err := storage.init(ctx); err != nil {
415426
t.Fatalf("storage.init: %v", err)
416427
}
417-
if err := s.publishCheckpoint(ctx, test.publishInterval, storage.publishCheckpoint); err != nil {
428+
429+
if err := s.publishCheckpoint(ctx, test.publishInterval, test.republishInterval, storage.publishCheckpoint); err != nil {
418430
t.Fatalf("publishTree: %v", err)
419431
}
420432
cpOld := []byte("bananas")
@@ -424,7 +436,7 @@ func TestPublishTree(t *testing.T) {
424436
updatesSeen := 0
425437
for _, d := range test.attempts {
426438
time.Sleep(d)
427-
if err := s.publishCheckpoint(ctx, test.publishInterval, storage.publishCheckpoint); err != nil {
439+
if err := s.publishCheckpoint(ctx, test.publishInterval, test.republishInterval, storage.publishCheckpoint); err != nil {
428440
t.Fatalf("publishTree: %v", err)
429441
}
430442
cpNew, _, err := m.getObject(ctx, layout.CheckpointPath)

0 commit comments

Comments
 (0)