@@ -3354,107 +3354,6 @@ func TestBackupRestoreIncremental(t *testing.T) {
33543354 }
33553355}
33563356
3357- // a bg worker is intended to write to the bank table concurrent with other
3358- // operations (writes, backups, restores), mutating the payload on rows-maxID.
3359- // it notified the `wake` channel (to allow ensuring bg activity has occurred)
3360- // and can be informed when errors are allowable (e.g. when the bank table is
3361- // unavailable between a drop and restore) via the atomic "bool" allowErrors.
3362- func startBackgroundWrites (
3363- stopper * stop.Stopper , sqlDB * gosql.DB , maxID int , wake chan <- struct {}, allowErrors * int32 ,
3364- ) error {
3365- rng , _ := randutil .NewTestRand ()
3366-
3367- for {
3368- select {
3369- case <- stopper .ShouldQuiesce ():
3370- return nil // All done.
3371- default :
3372- // Keep going.
3373- }
3374-
3375- id := rand .Intn (maxID )
3376- payload := randutil .RandBytes (rng , backupRestoreRowPayloadSize )
3377-
3378- updateFn := func () error {
3379- select {
3380- case <- stopper .ShouldQuiesce ():
3381- return nil // All done.
3382- default :
3383- // Keep going.
3384- }
3385- _ , err := sqlDB .Exec (`UPDATE data.bank SET payload = $1 WHERE id = $2` , payload , id )
3386- if atomic .LoadInt32 (allowErrors ) == 1 {
3387- return nil
3388- }
3389- return err
3390- }
3391- if err := retry .ForDuration (testutils .DefaultSucceedsSoonDuration , updateFn ); err != nil {
3392- return err
3393- }
3394- select {
3395- case wake <- struct {}{}:
3396- default :
3397- }
3398- }
3399- }
3400-
3401- func TestBackupRestoreWithConcurrentWrites (t * testing.T ) {
3402- defer leaktest .AfterTest (t )()
3403- defer log .Scope (t ).Close (t )
3404-
3405- const rows = 10
3406- const numBackgroundTasks = multiNode
3407-
3408- skip .UnderRace (t , "test is too slow under race" )
3409-
3410- ctx := context .Background ()
3411- tc , sqlDB , _ , cleanupFn := backupRestoreTestSetup (t , multiNode , rows , InitManualReplication )
3412- defer cleanupFn ()
3413-
3414- bgActivity := make (chan struct {})
3415- // allowErrors is used as an atomic bool to tell bg workers when to allow
3416- // errors, between dropping and restoring the table they are using.
3417- var allowErrors int32
3418- for task := 0 ; task < numBackgroundTasks ; task ++ {
3419- taskNum := task
3420- _ = tc .Stopper ().RunAsyncTask (ctx , "bg-task" , func (context.Context ) {
3421- conn := tc .Conns [taskNum % len (tc .Conns )]
3422- // Use different sql gateways to make sure leasing is right.
3423- if err := startBackgroundWrites (tc .Stopper (), conn , rows , bgActivity , & allowErrors ); err != nil {
3424- t .Error (err )
3425- }
3426- })
3427- }
3428-
3429- // Use the data.bank table as a key (id), value (balance) table with a
3430- // payload.The background tasks are mutating the table concurrently while we
3431- // backup and restore.
3432- <- bgActivity
3433-
3434- // Set, break, then reset the id=balance invariant -- while doing concurrent
3435- // writes -- to get multiple MVCC revisions as well as txn conflicts.
3436- sqlDB .Exec (t , `UPDATE data.bank SET balance = id` )
3437- <- bgActivity
3438- sqlDB .Exec (t , `UPDATE data.bank SET balance = -1` )
3439- <- bgActivity
3440- sqlDB .Exec (t , `UPDATE data.bank SET balance = id` )
3441- <- bgActivity
3442-
3443- // Backup DB while concurrent writes continue.
3444- sqlDB .Exec (t , `BACKUP DATABASE data INTO $1` , localFoo )
3445-
3446- // Drop the table and restore from backup and check our invariant.
3447- atomic .StoreInt32 (& allowErrors , 1 )
3448- sqlDB .Exec (t , `DROP TABLE data.bank` )
3449- sqlDB .Exec (t , `RESTORE TABLE data.* FROM LATEST IN $1` , localFoo )
3450- atomic .StoreInt32 (& allowErrors , 0 )
3451-
3452- bad := sqlDB .QueryStr (t , `SELECT id, balance, payload FROM data.bank WHERE id != balance` )
3453- for _ , r := range bad {
3454- t .Errorf ("bad row ID %s = bal %s (payload: %q)" , r [0 ], r [1 ], r [2 ])
3455- }
3456- }
3457-
34583357func TestConcurrentBackupRestores (t * testing.T ) {
34593358 defer leaktest .AfterTest (t )()
34603359 defer log .Scope (t ).Close (t )
0 commit comments