@@ -3280,107 +3280,6 @@ func TestBackupRestoreIncremental(t *testing.T) {
32803280 }
32813281}
32823282
3283- // a bg worker is intended to write to the bank table concurrent with other
3284- // operations (writes, backups, restores), mutating the payload on rows-maxID.
3285- // it notified the `wake` channel (to allow ensuring bg activity has occurred)
3286- // and can be informed when errors are allowable (e.g. when the bank table is
3287- // unavailable between a drop and restore) via the atomic "bool" allowErrors.
3288- func startBackgroundWrites (
3289- stopper * stop.Stopper , sqlDB * gosql.DB , maxID int , wake chan <- struct {}, allowErrors * int32 ,
3290- ) error {
3291- rng , _ := randutil .NewTestRand ()
3292-
3293- for {
3294- select {
3295- case <- stopper .ShouldQuiesce ():
3296- return nil // All done.
3297- default :
3298- // Keep going.
3299- }
3300-
3301- id := rand .Intn (maxID )
3302- payload := randutil .RandBytes (rng , backupRestoreRowPayloadSize )
3303-
3304- updateFn := func () error {
3305- select {
3306- case <- stopper .ShouldQuiesce ():
3307- return nil // All done.
3308- default :
3309- // Keep going.
3310- }
3311- _ , err := sqlDB .Exec (`UPDATE data.bank SET payload = $1 WHERE id = $2` , payload , id )
3312- if atomic .LoadInt32 (allowErrors ) == 1 {
3313- return nil
3314- }
3315- return err
3316- }
3317- if err := retry .ForDuration (testutils .DefaultSucceedsSoonDuration , updateFn ); err != nil {
3318- return err
3319- }
3320- select {
3321- case wake <- struct {}{}:
3322- default :
3323- }
3324- }
3325- }
3326-
3327- func TestBackupRestoreWithConcurrentWrites (t * testing.T ) {
3328- defer leaktest .AfterTest (t )()
3329- defer log .Scope (t ).Close (t )
3330-
3331- const rows = 10
3332- const numBackgroundTasks = multiNode
3333-
3334- skip .UnderRace (t , "test is too slow under race" )
3335-
3336- ctx := context .Background ()
3337- tc , sqlDB , _ , cleanupFn := backupRestoreTestSetup (t , multiNode , rows , InitManualReplication )
3338- defer cleanupFn ()
3339-
3340- bgActivity := make (chan struct {})
3341- // allowErrors is used as an atomic bool to tell bg workers when to allow
3342- // errors, between dropping and restoring the table they are using.
3343- var allowErrors int32
3344- for task := 0 ; task < numBackgroundTasks ; task ++ {
3345- taskNum := task
3346- _ = tc .Stopper ().RunAsyncTask (ctx , "bg-task" , func (context.Context ) {
3347- conn := tc .Conns [taskNum % len (tc .Conns )]
3348- // Use different sql gateways to make sure leasing is right.
3349- if err := startBackgroundWrites (tc .Stopper (), conn , rows , bgActivity , & allowErrors ); err != nil {
3350- t .Error (err )
3351- }
3352- })
3353- }
3354-
3355- // Use the data.bank table as a key (id), value (balance) table with a
3356- // payload.The background tasks are mutating the table concurrently while we
3357- // backup and restore.
3358- <- bgActivity
3359-
3360- // Set, break, then reset the id=balance invariant -- while doing concurrent
3361- // writes -- to get multiple MVCC revisions as well as txn conflicts.
3362- sqlDB .Exec (t , `UPDATE data.bank SET balance = id` )
3363- <- bgActivity
3364- sqlDB .Exec (t , `UPDATE data.bank SET balance = -1` )
3365- <- bgActivity
3366- sqlDB .Exec (t , `UPDATE data.bank SET balance = id` )
3367- <- bgActivity
3368-
3369- // Backup DB while concurrent writes continue.
3370- sqlDB .Exec (t , `BACKUP DATABASE data INTO $1` , localFoo )
3371-
3372- // Drop the table and restore from backup and check our invariant.
3373- atomic .StoreInt32 (& allowErrors , 1 )
3374- sqlDB .Exec (t , `DROP TABLE data.bank` )
3375- sqlDB .Exec (t , `RESTORE TABLE data.* FROM LATEST IN $1` , localFoo )
3376- atomic .StoreInt32 (& allowErrors , 0 )
3377-
3378- bad := sqlDB .QueryStr (t , `SELECT id, balance, payload FROM data.bank WHERE id != balance` )
3379- for _ , r := range bad {
3380- t .Errorf ("bad row ID %s = bal %s (payload: %q)" , r [0 ], r [1 ], r [2 ])
3381- }
3382- }
3383-
33843283func TestConcurrentBackupRestores (t * testing.T ) {
33853284 defer leaktest .AfterTest (t )()
33863285 defer log .Scope (t ).Close (t )
0 commit comments