@@ -3262,107 +3262,6 @@ func TestBackupRestoreIncremental(t *testing.T) {
32623262 }
32633263}
32643264
3265- // a bg worker is intended to write to the bank table concurrent with other
3266- // operations (writes, backups, restores), mutating the payload on rows-maxID.
3267- // it notified the `wake` channel (to allow ensuring bg activity has occurred)
3268- // and can be informed when errors are allowable (e.g. when the bank table is
3269- // unavailable between a drop and restore) via the atomic "bool" allowErrors.
3270- func startBackgroundWrites (
3271- stopper * stop.Stopper , sqlDB * gosql.DB , maxID int , wake chan <- struct {}, allowErrors * int32 ,
3272- ) error {
3273- rng , _ := randutil .NewTestRand ()
3274-
3275- for {
3276- select {
3277- case <- stopper .ShouldQuiesce ():
3278- return nil // All done.
3279- default :
3280- // Keep going.
3281- }
3282-
3283- id := rand .Intn (maxID )
3284- payload := randutil .RandBytes (rng , backupRestoreRowPayloadSize )
3285-
3286- updateFn := func () error {
3287- select {
3288- case <- stopper .ShouldQuiesce ():
3289- return nil // All done.
3290- default :
3291- // Keep going.
3292- }
3293- _ , err := sqlDB .Exec (`UPDATE data.bank SET payload = $1 WHERE id = $2` , payload , id )
3294- if atomic .LoadInt32 (allowErrors ) == 1 {
3295- return nil
3296- }
3297- return err
3298- }
3299- if err := retry .ForDuration (testutils .DefaultSucceedsSoonDuration , updateFn ); err != nil {
3300- return err
3301- }
3302- select {
3303- case wake <- struct {}{}:
3304- default :
3305- }
3306- }
3307- }
3308-
3309- func TestBackupRestoreWithConcurrentWrites (t * testing.T ) {
3310- defer leaktest .AfterTest (t )()
3311- defer log .Scope (t ).Close (t )
3312-
3313- const rows = 10
3314- const numBackgroundTasks = multiNode
3315-
3316- skip .UnderRace (t , "test is too slow under race" )
3317-
3318- ctx := context .Background ()
3319- tc , sqlDB , _ , cleanupFn := backupRestoreTestSetup (t , multiNode , rows , InitManualReplication )
3320- defer cleanupFn ()
3321-
3322- bgActivity := make (chan struct {})
3323- // allowErrors is used as an atomic bool to tell bg workers when to allow
3324- // errors, between dropping and restoring the table they are using.
3325- var allowErrors int32
3326- for task := 0 ; task < numBackgroundTasks ; task ++ {
3327- taskNum := task
3328- _ = tc .Stopper ().RunAsyncTask (ctx , "bg-task" , func (context.Context ) {
3329- conn := tc .Conns [taskNum % len (tc .Conns )]
3330- // Use different sql gateways to make sure leasing is right.
3331- if err := startBackgroundWrites (tc .Stopper (), conn , rows , bgActivity , & allowErrors ); err != nil {
3332- t .Error (err )
3333- }
3334- })
3335- }
3336-
3337- // Use the data.bank table as a key (id), value (balance) table with a
3338- // payload.The background tasks are mutating the table concurrently while we
3339- // backup and restore.
3340- <- bgActivity
3341-
3342- // Set, break, then reset the id=balance invariant -- while doing concurrent
3343- // writes -- to get multiple MVCC revisions as well as txn conflicts.
3344- sqlDB .Exec (t , `UPDATE data.bank SET balance = id` )
3345- <- bgActivity
3346- sqlDB .Exec (t , `UPDATE data.bank SET balance = -1` )
3347- <- bgActivity
3348- sqlDB .Exec (t , `UPDATE data.bank SET balance = id` )
3349- <- bgActivity
3350-
3351- // Backup DB while concurrent writes continue.
3352- sqlDB .Exec (t , `BACKUP DATABASE data INTO $1` , localFoo )
3353-
3354- // Drop the table and restore from backup and check our invariant.
3355- atomic .StoreInt32 (& allowErrors , 1 )
3356- sqlDB .Exec (t , `DROP TABLE data.bank` )
3357- sqlDB .Exec (t , `RESTORE TABLE data.* FROM LATEST IN $1` , localFoo )
3358- atomic .StoreInt32 (& allowErrors , 0 )
3359-
3360- bad := sqlDB .QueryStr (t , `SELECT id, balance, payload FROM data.bank WHERE id != balance` )
3361- for _ , r := range bad {
3362- t .Errorf ("bad row ID %s = bal %s (payload: %q)" , r [0 ], r [1 ], r [2 ])
3363- }
3364- }
3365-
33663265func TestConcurrentBackupRestores (t * testing.T ) {
33673266 defer leaktest .AfterTest (t )()
33683267 defer log .Scope (t ).Close (t )
0 commit comments