Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions pkg/cmd/roachtest/test_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,12 @@ type testImpl struct {
// githubIpToNodeMapping contains the ip to node map that will be passed to
// github.MaybePost
githubIpToNodeMapping string

// preempted is set when the test's failure was attributed to a VM
// preemption by runTest's post-failure checks. Read by the test runner
// after runTest returns to decide whether to requeue (e.g. retry a
// benchmark on non-spot VMs).
preempted bool
}
// Map from version to path to the cockroach binary to be used when
// mixed-version test wants a binary for that binary. If a particular version
Expand Down Expand Up @@ -524,6 +530,21 @@ func (t *testImpl) resetFailures() {
t.mu.failuresSuppressed = false
}

// markPreempted records that this test's failure was attributed to a VM
// preemption. See the comment on the preempted field for details.
func (t *testImpl) markPreempted() {
t.mu.Lock()
defer t.mu.Unlock()
t.mu.preempted = true
}

// wasPreempted reports whether markPreempted was called.
func (t *testImpl) wasPreempted() bool {
t.mu.RLock()
defer t.mu.RUnlock()
return t.mu.preempted
}

// We take the "squashed" error that contains information of all the errors for each failure.
func formatFailure(b *strings.Builder, reportFailures ...failure) {
for i, failure := range reportFailures {
Expand Down
15 changes: 15 additions & 0 deletions pkg/cmd/roachtest/test_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1195,6 +1195,20 @@ func (r *testRunner) runWorker(
}
}

// A preempted benchmark loses its per-nightly perf data point: the
// runner uses --count=1 and the test selector only forces re-selection
// on the *next* nightly. Requeue one retry on a non-spot cluster (which
// is also the loop-breaker, since only spot runs can preempt). Skip
// when --count>1: workPool.decTestLocked identifies entries by name,
// so a duplicate-named entry would race with the original's remaining
// runs.
if t.wasPreempted() && testToRun.spec.Benchmark &&
testToRun.spec.Cluster.UseSpotVMs && testToRun.runCount == 1 {
retrySpec := testToRun.spec
retrySpec.Cluster.UseSpotVMs = false
work.requeueForPreemptionRetry(ctx, workerL, retrySpec)
}

msg := "test passed: %s (run %d)"
if t.Failed() {
msg = "test failed: %s (run %d)"
Expand Down Expand Up @@ -1367,6 +1381,7 @@ func (r *testRunner) runTest(
// want to propagate the preemption error and avoid creating an issue.
t.resetFailures()
t.Error(vmPreemptionError(preemptedVMNames))
t.markPreempted()
}
hostErrorVMNames := getHostErrorVMNames(ctx, c, l)
if hostErrorVMNames != "" {
Expand Down
30 changes: 30 additions & 0 deletions pkg/cmd/roachtest/test_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1156,6 +1156,36 @@ func TestVMPreemptionPolling(t *testing.T) {

require.NoError(t, err)
})

// Test that a benchmark whose spot VM gets preempted is automatically
// requeued for one retry. Without this, a single preemption silently
// loses the nightly's perf data point for that benchmark (the runner
// uses --count=1 and the test selector only forces re-selection on
// the *next* nightly). The retry runs on a non-spot cluster, which is
// also the loop-breaker: only spot runs can preempt.
t.Run("benchmark preempted on spot is requeued for retry", func(t *testing.T) {
// Drive preemption via the polling path: both the original and the
// retry's Run will block on ctx.Done() and be cancelled when
// monitorForPreemptedVMs observes the (always-on) preemption hook.
setPollPreemptionInterval(50 * time.Millisecond)
getPreemptedVMsHook = func(c cluster.Cluster, ctx context.Context, l *logger.Logger) ([]vm.PreemptedVM, error) {
return []vm.PreemptedVM{{Name: "test_node", PreemptedAt: time.Now()}}, nil
}

var calls atomic.Int32
benchmarkTest := mockTest
benchmarkTest.Benchmark = true
benchmarkTest.Run = func(ctx context.Context, t test.Test, c cluster.Cluster) {
calls.Add(1)
<-ctx.Done()
}

err := runner.Run(ctx, []registry.TestSpec{benchmarkTest}, 1, /* count */
defaultParallelism, copt, testOpts{}, lopt, github)
require.NoError(t, err)
require.Equal(t, int32(2), calls.Load(),
"benchmark should run twice: original (spot, preempted) + one non-spot retry")
})
}

// TestRunnerFailureAfterTimeout checks that a test has a failure added
Expand Down
15 changes: 15 additions & 0 deletions pkg/cmd/roachtest/work_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,21 @@ func (p *workPool) findCompatibleTestsLocked(
return tests
}

// requeueForPreemptionRetry adds spec back to the pool for exactly one
// additional run. The entry is inserted at the head so workers pick it up
// promptly. The caller is responsible for any spec adjustments needed to
// break a retry loop (e.g. setting Cluster.UseSpotVMs=false so the retry
// can't itself be preempted into another retry).
func (p *workPool) requeueForPreemptionRetry(
ctx context.Context, l *logger.Logger, spec registry.TestSpec,
) {
p.mu.Lock()
defer p.mu.Unlock()
l.PrintfCtx(ctx, "requeueing %s for preemption retry (UseSpotVMs=%t)",
spec.Name, spec.Cluster.UseSpotVMs)
p.mu.tests = append([]testWithCount{{spec: spec, count: 1}}, p.mu.tests...)
}

// decTestLocked decrements a test's remaining count and removes it
// from the workPool if it was exhausted.
func (p *workPool) decTestLocked(ctx context.Context, l *logger.Logger, name string) {
Expand Down
Loading