@@ -3278,107 +3278,6 @@ func TestBackupRestoreIncremental(t *testing.T) {
32783278 }
32793279}
32803280
3281- // a bg worker is intended to write to the bank table concurrent with other
3282- // operations (writes, backups, restores), mutating the payload on rows-maxID.
3283- // it notified the `wake` channel (to allow ensuring bg activity has occurred)
3284- // and can be informed when errors are allowable (e.g. when the bank table is
3285- // unavailable between a drop and restore) via the atomic "bool" allowErrors.
3286- func startBackgroundWrites (
3287- stopper * stop.Stopper , sqlDB * gosql.DB , maxID int , wake chan <- struct {}, allowErrors * int32 ,
3288- ) error {
3289- rng , _ := randutil .NewTestRand ()
3290-
3291- for {
3292- select {
3293- case <- stopper .ShouldQuiesce ():
3294- return nil // All done.
3295- default :
3296- // Keep going.
3297- }
3298-
3299- id := rand .Intn (maxID )
3300- payload := randutil .RandBytes (rng , backupRestoreRowPayloadSize )
3301-
3302- updateFn := func () error {
3303- select {
3304- case <- stopper .ShouldQuiesce ():
3305- return nil // All done.
3306- default :
3307- // Keep going.
3308- }
3309- _ , err := sqlDB .Exec (`UPDATE data.bank SET payload = $1 WHERE id = $2` , payload , id )
3310- if atomic .LoadInt32 (allowErrors ) == 1 {
3311- return nil
3312- }
3313- return err
3314- }
3315- if err := retry .ForDuration (testutils .DefaultSucceedsSoonDuration , updateFn ); err != nil {
3316- return err
3317- }
3318- select {
3319- case wake <- struct {}{}:
3320- default :
3321- }
3322- }
3323- }
3324-
3325- func TestBackupRestoreWithConcurrentWrites (t * testing.T ) {
3326- defer leaktest .AfterTest (t )()
3327- defer log .Scope (t ).Close (t )
3328-
3329- const rows = 10
3330- const numBackgroundTasks = multiNode
3331-
3332- skip .UnderRace (t , "test is too slow under race" )
3333-
3334- ctx := context .Background ()
3335- tc , sqlDB , _ , cleanupFn := backupRestoreTestSetup (t , multiNode , rows , InitManualReplication )
3336- defer cleanupFn ()
3337-
3338- bgActivity := make (chan struct {})
3339- // allowErrors is used as an atomic bool to tell bg workers when to allow
3340- // errors, between dropping and restoring the table they are using.
3341- var allowErrors int32
3342- for task := 0 ; task < numBackgroundTasks ; task ++ {
3343- taskNum := task
3344- _ = tc .Stopper ().RunAsyncTask (ctx , "bg-task" , func (context.Context ) {
3345- conn := tc .Conns [taskNum % len (tc .Conns )]
3346- // Use different sql gateways to make sure leasing is right.
3347- if err := startBackgroundWrites (tc .Stopper (), conn , rows , bgActivity , & allowErrors ); err != nil {
3348- t .Error (err )
3349- }
3350- })
3351- }
3352-
3353- // Use the data.bank table as a key (id), value (balance) table with a
3354- // payload.The background tasks are mutating the table concurrently while we
3355- // backup and restore.
3356- <- bgActivity
3357-
3358- // Set, break, then reset the id=balance invariant -- while doing concurrent
3359- // writes -- to get multiple MVCC revisions as well as txn conflicts.
3360- sqlDB .Exec (t , `UPDATE data.bank SET balance = id` )
3361- <- bgActivity
3362- sqlDB .Exec (t , `UPDATE data.bank SET balance = -1` )
3363- <- bgActivity
3364- sqlDB .Exec (t , `UPDATE data.bank SET balance = id` )
3365- <- bgActivity
3366-
3367- // Backup DB while concurrent writes continue.
3368- sqlDB .Exec (t , `BACKUP DATABASE data INTO $1` , localFoo )
3369-
3370- // Drop the table and restore from backup and check our invariant.
3371- atomic .StoreInt32 (& allowErrors , 1 )
3372- sqlDB .Exec (t , `DROP TABLE data.bank` )
3373- sqlDB .Exec (t , `RESTORE TABLE data.* FROM LATEST IN $1` , localFoo )
3374- atomic .StoreInt32 (& allowErrors , 0 )
3375-
3376- bad := sqlDB .QueryStr (t , `SELECT id, balance, payload FROM data.bank WHERE id != balance` )
3377- for _ , r := range bad {
3378- t .Errorf ("bad row ID %s = bal %s (payload: %q)" , r [0 ], r [1 ], r [2 ])
3379- }
3380- }
3381-
33823281func TestConcurrentBackupRestores (t * testing.T ) {
33833282 defer leaktest .AfterTest (t )()
33843283 defer log .Scope (t ).Close (t )
0 commit comments