diff --git a/pkg/cmd/roachtest/test_impl.go b/pkg/cmd/roachtest/test_impl.go index 8d858e3328cd..b214c262bc79 100644 --- a/pkg/cmd/roachtest/test_impl.go +++ b/pkg/cmd/roachtest/test_impl.go @@ -148,6 +148,12 @@ type testImpl struct { // githubIpToNodeMapping contains the ip to node map that will be passed to // github.MaybePost githubIpToNodeMapping string + + // preempted is set when the test's failure was attributed to a VM + // preemption by runTest's post-failure checks. Read by the test runner + // after runTest returns to decide whether to requeue (e.g. retry a + // benchmark on non-spot VMs). + preempted bool } // Map from version to path to the cockroach binary to be used when // mixed-version test wants a binary for that binary. If a particular version @@ -524,6 +530,21 @@ func (t *testImpl) resetFailures() { t.mu.failuresSuppressed = false } +// markPreempted records that this test's failure was attributed to a VM +// preemption. See the comment on the preempted field for details. +func (t *testImpl) markPreempted() { + t.mu.Lock() + defer t.mu.Unlock() + t.mu.preempted = true +} + +// wasPreempted reports whether markPreempted was called. +func (t *testImpl) wasPreempted() bool { + t.mu.RLock() + defer t.mu.RUnlock() + return t.mu.preempted +} + // We take the "squashed" error that contains information of all the errors for each failure. func formatFailure(b *strings.Builder, reportFailures ...failure) { for i, failure := range reportFailures { diff --git a/pkg/cmd/roachtest/test_runner.go b/pkg/cmd/roachtest/test_runner.go index 05fb54f6bc42..0a86341d1e24 100644 --- a/pkg/cmd/roachtest/test_runner.go +++ b/pkg/cmd/roachtest/test_runner.go @@ -1195,6 +1195,20 @@ func (r *testRunner) runWorker( } } + // A preempted benchmark loses its per-nightly perf data point: the + // runner uses --count=1 and the test selector only forces re-selection + // on the *next* nightly. Requeue one retry on a non-spot cluster (which + // is also the loop-breaker, since only spot runs can preempt). Skip + // when --count>1: workPool.decTestLocked identifies entries by name, + // so a duplicate-named entry would race with the original's remaining + // runs. + if t.wasPreempted() && testToRun.spec.Benchmark && + testToRun.spec.Cluster.UseSpotVMs && testToRun.runCount == 1 { + retrySpec := testToRun.spec + retrySpec.Cluster.UseSpotVMs = false + work.requeueForPreemptionRetry(ctx, workerL, retrySpec) + } + msg := "test passed: %s (run %d)" if t.Failed() { msg = "test failed: %s (run %d)" @@ -1367,6 +1381,7 @@ func (r *testRunner) runTest( // want to propagate the preemption error and avoid creating an issue. t.resetFailures() t.Error(vmPreemptionError(preemptedVMNames)) + t.markPreempted() } hostErrorVMNames := getHostErrorVMNames(ctx, c, l) if hostErrorVMNames != "" { diff --git a/pkg/cmd/roachtest/test_test.go b/pkg/cmd/roachtest/test_test.go index 9cf37869c9e9..563eb46e9e65 100644 --- a/pkg/cmd/roachtest/test_test.go +++ b/pkg/cmd/roachtest/test_test.go @@ -1156,6 +1156,36 @@ func TestVMPreemptionPolling(t *testing.T) { require.NoError(t, err) }) + + // Test that a benchmark whose spot VM gets preempted is automatically + // requeued for one retry. Without this, a single preemption silently + // loses the nightly's perf data point for that benchmark (the runner + // uses --count=1 and the test selector only forces re-selection on + // the *next* nightly). The retry runs on a non-spot cluster, which is + // also the loop-breaker: only spot runs can preempt. + t.Run("benchmark preempted on spot is requeued for retry", func(t *testing.T) { + // Drive preemption via the polling path: both the original and the + // retry's Run will block on ctx.Done() and be cancelled when + // monitorForPreemptedVMs observes the (always-on) preemption hook. + setPollPreemptionInterval(50 * time.Millisecond) + getPreemptedVMsHook = func(c cluster.Cluster, ctx context.Context, l *logger.Logger) ([]vm.PreemptedVM, error) { + return []vm.PreemptedVM{{Name: "test_node", PreemptedAt: time.Now()}}, nil + } + + var calls atomic.Int32 + benchmarkTest := mockTest + benchmarkTest.Benchmark = true + benchmarkTest.Run = func(ctx context.Context, t test.Test, c cluster.Cluster) { + calls.Add(1) + <-ctx.Done() + } + + err := runner.Run(ctx, []registry.TestSpec{benchmarkTest}, 1, /* count */ + defaultParallelism, copt, testOpts{}, lopt, github) + require.NoError(t, err) + require.Equal(t, int32(2), calls.Load(), + "benchmark should run twice: original (spot, preempted) + one non-spot retry") + }) } // TestRunnerFailureAfterTimeout checks that a test has a failure added diff --git a/pkg/cmd/roachtest/work_pool.go b/pkg/cmd/roachtest/work_pool.go index 7ce353959d3d..507990af1742 100644 --- a/pkg/cmd/roachtest/work_pool.go +++ b/pkg/cmd/roachtest/work_pool.go @@ -253,6 +253,21 @@ func (p *workPool) findCompatibleTestsLocked( return tests } +// requeueForPreemptionRetry adds spec back to the pool for exactly one +// additional run. The entry is inserted at the head so workers pick it up +// promptly. The caller is responsible for any spec adjustments needed to +// break a retry loop (e.g. setting Cluster.UseSpotVMs=false so the retry +// can't itself be preempted into another retry). +func (p *workPool) requeueForPreemptionRetry( + ctx context.Context, l *logger.Logger, spec registry.TestSpec, +) { + p.mu.Lock() + defer p.mu.Unlock() + l.PrintfCtx(ctx, "requeueing %s for preemption retry (UseSpotVMs=%t)", + spec.Name, spec.Cluster.UseSpotVMs) + p.mu.tests = append([]testWithCount{{spec: spec, count: 1}}, p.mu.tests...) +} + // decTestLocked decrements a test's remaining count and removes it // from the workPool if it was exhausted. func (p *workPool) decTestLocked(ctx context.Context, l *logger.Logger, name string) {