Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 0 additions & 101 deletions pkg/backup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3278,107 +3278,6 @@ func TestBackupRestoreIncremental(t *testing.T) {
}
}

// a bg worker is intended to write to the bank table concurrent with other
// operations (writes, backups, restores), mutating the payload on rows-maxID.
// it notified the `wake` channel (to allow ensuring bg activity has occurred)
// and can be informed when errors are allowable (e.g. when the bank table is
// unavailable between a drop and restore) via the atomic "bool" allowErrors.
func startBackgroundWrites(
stopper *stop.Stopper, sqlDB *gosql.DB, maxID int, wake chan<- struct{}, allowErrors *int32,
) error {
rng, _ := randutil.NewTestRand()

for {
select {
case <-stopper.ShouldQuiesce():
return nil // All done.
default:
// Keep going.
}

id := rand.Intn(maxID)
payload := randutil.RandBytes(rng, backupRestoreRowPayloadSize)

updateFn := func() error {
select {
case <-stopper.ShouldQuiesce():
return nil // All done.
default:
// Keep going.
}
_, err := sqlDB.Exec(`UPDATE data.bank SET payload = $1 WHERE id = $2`, payload, id)
if atomic.LoadInt32(allowErrors) == 1 {
return nil
}
return err
}
if err := retry.ForDuration(testutils.DefaultSucceedsSoonDuration, updateFn); err != nil {
return err
}
select {
case wake <- struct{}{}:
default:
}
}
}

func TestBackupRestoreWithConcurrentWrites(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const rows = 10
const numBackgroundTasks = multiNode

skip.UnderRace(t, "test is too slow under race")

ctx := context.Background()
tc, sqlDB, _, cleanupFn := backupRestoreTestSetup(t, multiNode, rows, InitManualReplication)
defer cleanupFn()

bgActivity := make(chan struct{})
// allowErrors is used as an atomic bool to tell bg workers when to allow
// errors, between dropping and restoring the table they are using.
var allowErrors int32
for task := 0; task < numBackgroundTasks; task++ {
taskNum := task
_ = tc.Stopper().RunAsyncTask(ctx, "bg-task", func(context.Context) {
conn := tc.Conns[taskNum%len(tc.Conns)]
// Use different sql gateways to make sure leasing is right.
if err := startBackgroundWrites(tc.Stopper(), conn, rows, bgActivity, &allowErrors); err != nil {
t.Error(err)
}
})
}

// Use the data.bank table as a key (id), value (balance) table with a
// payload.The background tasks are mutating the table concurrently while we
// backup and restore.
<-bgActivity

// Set, break, then reset the id=balance invariant -- while doing concurrent
// writes -- to get multiple MVCC revisions as well as txn conflicts.
sqlDB.Exec(t, `UPDATE data.bank SET balance = id`)
<-bgActivity
sqlDB.Exec(t, `UPDATE data.bank SET balance = -1`)
<-bgActivity
sqlDB.Exec(t, `UPDATE data.bank SET balance = id`)
<-bgActivity

// Backup DB while concurrent writes continue.
sqlDB.Exec(t, `BACKUP DATABASE data INTO $1`, localFoo)

// Drop the table and restore from backup and check our invariant.
atomic.StoreInt32(&allowErrors, 1)
sqlDB.Exec(t, `DROP TABLE data.bank`)
sqlDB.Exec(t, `RESTORE TABLE data.* FROM LATEST IN $1`, localFoo)
atomic.StoreInt32(&allowErrors, 0)

bad := sqlDB.QueryStr(t, `SELECT id, balance, payload FROM data.bank WHERE id != balance`)
for _, r := range bad {
t.Errorf("bad row ID %s = bal %s (payload: %q)", r[0], r[1], r[2])
}
}

func TestConcurrentBackupRestores(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
Loading