diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index aa5f0ce3dd59..70e88266c278 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -808,6 +808,11 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { rangeStatsFetcher := rangestats.NewFetcher(cfg.db) vecIndexManager := vecindex.NewManager(ctx, cfg.stopper, &cfg.Settings.SV, codec, cfg.internalDB) + + if vecIndexKnobs, ok := cfg.TestingKnobs.VecIndexTestingKnobs.(*vecindex.VecIndexTestingKnobs); ok && vecIndexKnobs != nil { + vecIndexManager.SetTestingKnobs(vecIndexKnobs) + } + cfg.registry.AddMetricStruct(vecIndexManager.Metrics()) // Set up the DistSQL server. diff --git a/pkg/sql/colexecerror/error.go b/pkg/sql/colexecerror/error.go index d573f6f34ee0..1777f5a0fa9a 100644 --- a/pkg/sql/colexecerror/error.go +++ b/pkg/sql/colexecerror/error.go @@ -170,12 +170,13 @@ func CatchVectorizedRuntimeError(operation func()) (retErr error) { // Multiple actual packages can have the same prefix as a single constant string // defined below, but all of such packages are allowed to be caught from. const ( - colPackagesPrefix = "github.com/cockroachdb/cockroach/pkg/col" - encodingPackagePrefix = "github.com/cockroachdb/cockroach/pkg/util/encoding" - execinfraPackagePrefix = "github.com/cockroachdb/cockroach/pkg/sql/execinfra" - sqlColPackagesPrefix = "github.com/cockroachdb/cockroach/pkg/sql/col" - sqlRowPackagesPrefix = "github.com/cockroachdb/cockroach/pkg/sql/row" - sqlSemPackagesPrefix = "github.com/cockroachdb/cockroach/pkg/sql/sem" + colPackagesPrefix = "github.com/cockroachdb/cockroach/pkg/col" + encodingPackagePrefix = "github.com/cockroachdb/cockroach/pkg/util/encoding" + execinfraPackagePrefix = "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + sqlColPackagesPrefix = "github.com/cockroachdb/cockroach/pkg/sql/col" + sqlRowPackagesPrefix = "github.com/cockroachdb/cockroach/pkg/sql/row" + sqlSemPackagesPrefix = "github.com/cockroachdb/cockroach/pkg/sql/sem" + sqlVecindexPackagesPrefix = "github.com/cockroachdb/cockroach/pkg/sql/vecindex" // When running BenchmarkCatchVectorizedRuntimeError under bazel, the // repository prefix is missing. testSqlColPackagesPrefix = "pkg/sql/col" @@ -217,6 +218,7 @@ func shouldCatchPanic(panicEmittedFrom string) bool { strings.HasPrefix(panicEmittedFrom, sqlColPackagesPrefix) || strings.HasPrefix(panicEmittedFrom, sqlRowPackagesPrefix) || strings.HasPrefix(panicEmittedFrom, sqlSemPackagesPrefix) || + strings.HasPrefix(panicEmittedFrom, sqlVecindexPackagesPrefix) || strings.HasPrefix(panicEmittedFrom, testSqlColPackagesPrefix) } diff --git a/pkg/sql/rowexec/vector_search.go b/pkg/sql/rowexec/vector_search.go index 2de9745290b8..dc068cab6d91 100644 --- a/pkg/sql/rowexec/vector_search.go +++ b/pkg/sql/rowexec/vector_search.go @@ -71,6 +71,9 @@ func newVectorSearchProcessor( rerankMultiplier := int(flowCtx.EvalCtx.SessionData().VectorSearchRerankMultiplier) v.searcher.Init(flowCtx.EvalCtx, idx, flowCtx.Txn, &spec.GetFullVectorsFetchSpec, searchBeamSize, maxResults, rerankMultiplier) + if mgr, ok := flowCtx.Cfg.VecIndexManager.(*vecindex.Manager); ok { + v.searcher.SetTestingKnobs(mgr.TestingKnobs()) + } colTypes := make([]*types.T, len(v.fetchSpec.FetchedColumns)) for i, col := range v.fetchSpec.FetchedColumns { colTypes[i] = col.Type @@ -276,6 +279,9 @@ func newVectorMutationSearchProcessor( return nil, err } v.searcher.Init(flowCtx.EvalCtx, idx, flowCtx.Txn, &spec.GetFullVectorsFetchSpec) + if mgr, ok := flowCtx.Cfg.VecIndexManager.(*vecindex.Manager); ok { + v.searcher.SetTestingKnobs(mgr.TestingKnobs()) + } // Pass through the input columns, and add the partition column and optional // quantized vector column. diff --git a/pkg/sql/vecindex/BUILD.bazel b/pkg/sql/vecindex/BUILD.bazel index 1de64e393af4..6994fb05fb70 100644 --- a/pkg/sql/vecindex/BUILD.bazel +++ b/pkg/sql/vecindex/BUILD.bazel @@ -69,6 +69,7 @@ go_test( "//pkg/sql/catalog/descs", "//pkg/sql/catalog/desctestutils", "//pkg/sql/catalog/tabledesc", + "//pkg/sql/colexecerror", "//pkg/sql/rowenc", "//pkg/sql/sem/catid", "//pkg/sql/sem/eval", @@ -93,6 +94,7 @@ go_test( "//pkg/util/randutil", "//pkg/util/vector", "@com_github_cockroachdb_datadriven//:datadriven", + "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/sql/vecindex/cspann/index.go b/pkg/sql/vecindex/cspann/index.go index d3f53677f3b3..bf9a4959af77 100644 --- a/pkg/sql/vecindex/cspann/index.go +++ b/pkg/sql/vecindex/cspann/index.go @@ -110,6 +110,11 @@ type IndexOptions struct { // MaxDeleteAttempts controls the number of times Delete or SearchForDelete // will retry after failed attempts to find a requested deletion vector. MaxDeleteAttempts int + // PanicDuringCspannSearch, if non-nil, is invoked at the top of + // Index.Search. Used by tests in the parent vecindex package to inject + // panics that exercise the colexecerror allow-list for this subpackage. + // Always nil in production. + PanicDuringCspannSearch func() } // SearchOptions specifies options that apply to a particular search operation @@ -490,6 +495,10 @@ func (vi *Index) Search( searchSet *SearchSet, options SearchOptions, ) error { + if vi.options.PanicDuringCspannSearch != nil { + vi.options.PanicDuringCspannSearch() + } + vi.setupContext(idxCtx, treeKey, LeafLevel, options) idxCtx.query.InitOriginal(vi.quantizer.GetDistanceMetric(), vec, &vi.rot) return vi.searchHelper(ctx, idxCtx, searchSet) diff --git a/pkg/sql/vecindex/manager.go b/pkg/sql/vecindex/manager.go index 703183684d39..dfbf884c4c05 100644 --- a/pkg/sql/vecindex/manager.go +++ b/pkg/sql/vecindex/manager.go @@ -83,6 +83,13 @@ func (m *Manager) SetTestingKnobs(knobs *VecIndexTestingKnobs) { m.testingKnobs = knobs } +// TestingKnobs returns the testing knobs configured on the manager, or nil if +// none have been set. Exposed so that executor processors can propagate the +// knobs to per-flow helpers such as Searcher. +func (m *Manager) TestingKnobs() *VecIndexTestingKnobs { + return m.testingKnobs +} + // Metrics returns a metric.Struct which holds metrics for all vector indexes // maintained by the manager. func (m *Manager) Metrics() metric.Struct { @@ -121,6 +128,36 @@ func (m *Manager) Get( e = &indexEntry{mustWait: true, waitCond: sync.Cond{L: &m.mu}} m.mu.indexes[idxKey] = e + // If the index construction below panics, mustWait will still be true + // because the normal completion path (which sets mustWait=false and calls + // Broadcast) is skipped. Wrap the panic cause into e.err so parked waiters + // get a meaningful error, drop the cache entry so subsequent callers retry + // instead of hanging, then rethrow so colexecerror catches the original + // panic for the calling goroutine as usual. + defer func() { + if !e.mustWait { + return + } + + e.mustWait = false + + r := recover() + cause, ok := r.(error) + if !ok { + // Covers both nil (Goexit-style exit) and non-error panic values. + cause = errors.Newf("non-error panic value: %v", r) + } + e.err = errors.NewAssertionErrorWithWrappedErrf( + cause, "vector index construction panicked") + + e.waitCond.Broadcast() + delete(m.mu.indexes, idxKey) + + if r != nil { + panic(r) + } + }() + idx, err := func() (*cspann.Index, error) { // Unlock while we build the index structure so that concurrent requests can be // serviced. We've already set mustWait to true, so other requests will wait @@ -167,13 +204,13 @@ func (m *Manager) Get( if err != nil { // Don't keep the index entry around, so that we retry the query. - m.mu.indexes[idxKey] = nil + delete(m.mu.indexes, idxKey) } return idx, err } func (m *Manager) getIndexOptions(config *vecpb.Config, readOnly bool) *cspann.IndexOptions { - return &cspann.IndexOptions{ + opts := &cspann.IndexOptions{ RotAlgorithm: config.RotAlgorithm, MinPartitionSize: int(config.MinPartitionSize), MaxPartitionSize: int(config.MaxPartitionSize), @@ -187,6 +224,20 @@ func (m *Manager) getIndexOptions(config *vecpb.Config, readOnly bool) *cspann.I // Disable adaptive search until it's extended to work with vecstore. DisableAdaptiveSearch: true, } + if m.testingKnobs != nil { + // Install a lazy closure rather than copying the function pointer + // directly, because Manager caches *cspann.Index and its IndexOptions + // for the lifetime of the manager. Tests that mutate the knob fields + // after the index is first created (the common pattern in + // TestVectorIndexPanicCaught) would otherwise see the stale nil + // captured at creation time. + opts.PanicDuringCspannSearch = func() { + if panicFn := m.testingKnobs.PanicDuringCspannSearch; panicFn != nil { + panicFn() + } + } + } + return opts } func (m *Manager) getVecConfig( diff --git a/pkg/sql/vecindex/manager_test.go b/pkg/sql/vecindex/manager_test.go index 861ab255f4f0..5c314d39b040 100644 --- a/pkg/sql/vecindex/manager_test.go +++ b/pkg/sql/vecindex/manager_test.go @@ -8,6 +8,7 @@ package vecindex_test import ( "context" "strconv" + "strings" "sync" "testing" "time" @@ -30,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -240,4 +242,82 @@ func TestVectorManager(t *testing.T) { } vectorMgr.SetTestingKnobs(nil) }) + + t.Run("test panic during pull recovers cleanly", func(t *testing.T) { + // Block the panicking goroutine until the other 10 callers are parked on + // e.waitCond, so the panic propagates exactly when 10 waiters depend on + // the cleanup defer in Manager.Get. + pullDelayer := sync.WaitGroup{} + pullDelayer.Add(10) + testingKnobs := vecindex.VecIndexTestingKnobs{ + DuringVecIndexPull: func() { + pullDelayer.Wait() + panic(errors.AssertionFailedf("injected vecindex panic")) + }, + BeforeVecIndexWait: func() { + pullDelayer.Done() + }, + } + vectorMgr.SetTestingKnobs(&testingKnobs) + + // Table 150 is created in setup but not pulled by any earlier subtest, + // so DuringVecIndexPull will fire (cached entries skip the inner + // closure). + const tableID catid.DescID = 150 + + // Fire 11 concurrent Get calls. The first to acquire m.mu installs the + // indexEntry and runs the inner closure, which panics. The other 10 + // observe e.mustWait=true and block on e.waitCond. The cleanup defer + // in Manager.Get must unblock all 10 and delete the cache entry so a + // subsequent Get retries instead of hanging. + results := make([]error, 11) + wg := sync.WaitGroup{} + for i := range 11 { + wg.Add(1) + go func(idx int) { + defer wg.Done() + defer func() { + if r := recover(); r != nil { + if err, ok := r.(error); ok { + results[idx] = err + } else { + results[idx] = errors.Newf("non-error panic: %v", r) + } + } + }() + _, err := vectorMgr.Get(ctx, tableID, 2) + if err != nil { + results[idx] = err + } + }(i) + } + wg.Wait() + + // The panicker propagates the raw injected panic; the 10 waiters get the + // wrapped sentinel, which contains the original cause as well. Check the + // sentinel substring first so a wrapped error isn't miscounted as the + // raw panic. + var injectedCount, wrappedCount int + for _, e := range results { + require.Error(t, e) + require.Contains(t, e.Error(), "injected vecindex panic", + "all errors should carry the original panic cause") + if strings.Contains(e.Error(), "vector index construction panicked") { + wrappedCount++ + } else { + injectedCount++ + } + } + require.Equal(t, 1, injectedCount, + "exactly one goroutine should propagate the original panic unwrapped") + require.Equal(t, 10, wrappedCount, + "waiters should receive the wrapped sentinel error") + + // After cleanup, the cache entry must be gone so a second Get retries. + // Clear the panicking knob first so the retry can succeed. + vectorMgr.SetTestingKnobs(nil) + idx, err := vectorMgr.Get(ctx, tableID, 2) + require.NoError(t, err) + require.NotNil(t, idx) + }) } diff --git a/pkg/sql/vecindex/mutation_searcher.go b/pkg/sql/vecindex/mutation_searcher.go index e935144cd693..3359722ab0b5 100644 --- a/pkg/sql/vecindex/mutation_searcher.go +++ b/pkg/sql/vecindex/mutation_searcher.go @@ -32,6 +32,7 @@ type MutationSearcher struct { partitionKey tree.Datum encoded tree.Datum evalCtx *eval.Context + testingKnobs *VecIndexTestingKnobs } // Init wraps the given KV transaction in a C-SPANN transaction and initializes @@ -61,6 +62,14 @@ func (s *MutationSearcher) KVStats() vecstore.KVStats { return s.txn.KVStats() } +// SetTestingKnobs configures testing knobs on the mutation searcher. Invoked +// by executor processors after Init to propagate knobs from the Manager. The +// pointer is nil in production, and a nil pointer is a no-op for subsequent +// knob lookups. +func (s *MutationSearcher) SetTestingKnobs(knobs *VecIndexTestingKnobs) { + s.testingKnobs = knobs +} + // SearchForInsert triggers a search for the partition in which to insert the // input vector. The partition's key is returned by PartitionKey() and the // input vector's quantized and encoded form is returned by EncodedVector(). @@ -70,6 +79,10 @@ func (s *MutationSearcher) KVStats() vecstore.KVStats { func (s *MutationSearcher) SearchForInsert( ctx context.Context, prefix roachpb.Key, vec vector.T, ) error { + if s.testingKnobs != nil && s.testingKnobs.PanicDuringMutationSearch != nil { + s.testingKnobs.PanicDuringMutationSearch() + } + res, err := s.idx.SearchForInsert(ctx, &s.idxCtx, cspann.TreeKey(prefix), vec) if err != nil { return err @@ -101,6 +114,10 @@ func (s *MutationSearcher) SearchForInsert( func (s *MutationSearcher) SearchForDelete( ctx context.Context, prefix roachpb.Key, vec vector.T, key cspann.KeyBytes, ) error { + if s.testingKnobs != nil && s.testingKnobs.PanicDuringMutationSearch != nil { + s.testingKnobs.PanicDuringMutationSearch() + } + res, err := s.idx.SearchForDelete(ctx, &s.idxCtx, cspann.TreeKey(prefix), vec, key) if err != nil { return err diff --git a/pkg/sql/vecindex/searcher.go b/pkg/sql/vecindex/searcher.go index ebe6e2739b8f..7c49171211e8 100644 --- a/pkg/sql/vecindex/searcher.go +++ b/pkg/sql/vecindex/searcher.go @@ -24,14 +24,15 @@ import ( // NOTE: Searcher is intended to be embedded within an execution engine // processor object. type Searcher struct { - idx *cspann.Index - txn vecstore.Txn - idxCtx cspann.Context - options cspann.SearchOptions - searchSet cspann.SearchSet - results cspann.SearchResults - resultIdx int - evalCtx *eval.Context + idx *cspann.Index + txn vecstore.Txn + idxCtx cspann.Context + options cspann.SearchOptions + searchSet cspann.SearchSet + results cspann.SearchResults + resultIdx int + evalCtx *eval.Context + testingKnobs *VecIndexTestingKnobs } // Init wraps the given KV transaction in a C-SPANN transaction and initializes @@ -65,6 +66,14 @@ func (s *Searcher) Init( cspann.IncreaseRerankResults(baseBeamSize, maxResults, rerankMultiplier) } +// SetTestingKnobs configures testing knobs on the searcher. Invoked by +// executor processors after Init to propagate knobs from the Manager. The +// pointer is nil in production, and a nil pointer is a no-op for subsequent +// knob lookups. +func (s *Searcher) SetTestingKnobs(knobs *VecIndexTestingKnobs) { + s.testingKnobs = knobs +} + // Search triggers a search over the index for the given vector, within the // scope of the given prefix. "maxResults" specifies the maximum number of // results that will be returned. @@ -72,6 +81,10 @@ func (s *Searcher) Init( // NOTE: The caller is assumed to own the memory for all parameters and can // reuse the memory after the call returns. func (s *Searcher) Search(ctx context.Context, prefix roachpb.Key, vec vector.T) error { + if s.testingKnobs != nil && s.testingKnobs.PanicDuringSearch != nil { + s.testingKnobs.PanicDuringSearch() + } + err := s.idx.Search(ctx, &s.idxCtx, cspann.TreeKey(prefix), vec, &s.searchSet, s.options) if err != nil { return err diff --git a/pkg/sql/vecindex/searcher_test.go b/pkg/sql/vecindex/searcher_test.go index 61346fdbbd88..683df95fe23e 100644 --- a/pkg/sql/vecindex/searcher_test.go +++ b/pkg/sql/vecindex/searcher_test.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" + "github.com/cockroachdb/cockroach/pkg/sql/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/idxtype" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -33,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/vector" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -215,3 +217,82 @@ func TestSearcher(t *testing.T) { require.NoError(t, tx.Commit(ctx)) } + +// TestVectorIndexPanicCaught verifies that panics originating in +// pkg/sql/vecindex on the SQL executor path are caught by the colexecerror +// allow-list and returned to the SQL client as internal errors, rather than +// crashing the node. Subtests cover the read and mutation executor paths. +// +// The fact that the test process is still alive between subtests is the +// implicit allow-list assertion: without the allow-list entry covering +// pkg/sql/vecindex, the panic would re-propagate out of +// CatchVectorizedRuntimeError and tear down the test binary. +// +// Regression check: commenting out the sqlVecindexPackagesPrefix line in +// pkg/sql/colexecerror/error.go's shouldCatchPanic causes all subtests to +// fail with uncaught panics (exit code 2). The cspann.Index.Search subtest +// specifically panics from a subpackage, locking down the breadth of the +// prefix match: that the allow-list covers pkg/sql/vecindex/... subpackages, +// not just the top-level package. +func TestVectorIndexPanicCaught(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + // crdb_test builds default to re-panicking from CatchVectorizedRuntimeError + // so that bugs in the vectorized engine fail loudly in tests. This test + // specifically exercises the production panic-catching path, so restore + // release-build behavior for its scope. + defer colexecerror.ProductionBehaviorForTests()() + + ctx := context.Background() + knobs := &VecIndexTestingKnobs{} + srv, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + VecIndexTestingKnobs: knobs, + }, + }) + defer srv.Stopper().Stop(ctx) + runner := sqlutils.MakeSQLRunner(sqlDB) + + runner.Exec(t, "SET CLUSTER SETTING feature.vector_index.enabled = true") + runner.Exec(t, "CREATE TABLE t (k INT PRIMARY KEY, v VECTOR(2), VECTOR INDEX (v))") + // Seed rows so the SELECT subtests have something for the optimizer to + // plan a vector index scan against. This INSERT runs before any knob is + // set, so it completes normally. + runner.Exec(t, "INSERT INTO t VALUES (1, '[1, 2]'), (2, '[3, 4]')") + + t.Run("MutationSearcher panic on INSERT", func(t *testing.T) { + knobs.PanicDuringMutationSearch = func() { + panic(errors.AssertionFailedf("injected MutationSearcher panic")) + } + defer func() { knobs.PanicDuringMutationSearch = nil }() + + _, err := sqlDB.ExecContext(ctx, "INSERT INTO t VALUES (99, '[7, 8]')") + require.Error(t, err) + require.Contains(t, err.Error(), "injected MutationSearcher panic") + // Sanity: panic fires at the top of SearchForInsert, before any KV + // write, so the seed rows are intact. + runner.CheckQueryResults(t, "SELECT count(*) FROM t", [][]string{{"2"}}) + }) + + t.Run("Searcher panic on SELECT", func(t *testing.T) { + knobs.PanicDuringSearch = func() { + panic(errors.AssertionFailedf("injected Searcher.Search panic")) + } + defer func() { knobs.PanicDuringSearch = nil }() + + _, err := sqlDB.ExecContext(ctx, "SELECT k FROM t ORDER BY v <-> '[1, 2]' LIMIT 1") + require.Error(t, err) + require.Contains(t, err.Error(), "injected Searcher.Search panic") + }) + + t.Run("cspann.Index.Search panic on SELECT", func(t *testing.T) { + knobs.PanicDuringCspannSearch = func() { + panic(errors.AssertionFailedf("injected cspann.Index.Search panic")) + } + defer func() { knobs.PanicDuringCspannSearch = nil }() + + _, err := sqlDB.ExecContext(ctx, "SELECT k FROM t ORDER BY v <-> '[1, 2]' LIMIT 1") + require.Error(t, err) + require.Contains(t, err.Error(), "injected cspann.Index.Search panic") + }) +} diff --git a/pkg/sql/vecindex/testing_knobs.go b/pkg/sql/vecindex/testing_knobs.go index 2356398fcfe0..bbd963a5e0ff 100644 --- a/pkg/sql/vecindex/testing_knobs.go +++ b/pkg/sql/vecindex/testing_knobs.go @@ -10,6 +10,19 @@ import "github.com/cockroachdb/cockroach/pkg/base" type VecIndexTestingKnobs struct { DuringVecIndexPull func() BeforeVecIndexWait func() + + // PanicDuringSearch, if set, is invoked at the top of every call to + // Searcher.Search. Gate the closure on a counter to fire on a specific + // call only. + PanicDuringSearch func() + + // PanicDuringMutationSearch, if set, is invoked at the top of every call + // to MutationSearcher.SearchForInsert and SearchForDelete. + PanicDuringMutationSearch func() + + // PanicDuringCspannSearch, if set, is invoked at the top of + // cspann.Index.Search via IndexOptions.PanicDuringCspannSearch. + PanicDuringCspannSearch func() } var _ base.ModuleTestingKnobs = (*VecIndexTestingKnobs)(nil)