Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,11 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
rangeStatsFetcher := rangestats.NewFetcher(cfg.db)

vecIndexManager := vecindex.NewManager(ctx, cfg.stopper, &cfg.Settings.SV, codec, cfg.internalDB)

if vecIndexKnobs, ok := cfg.TestingKnobs.VecIndexTestingKnobs.(*vecindex.VecIndexTestingKnobs); ok && vecIndexKnobs != nil {
vecIndexManager.SetTestingKnobs(vecIndexKnobs)
}

cfg.registry.AddMetricStruct(vecIndexManager.Metrics())

// Set up the DistSQL server.
Expand Down
14 changes: 8 additions & 6 deletions pkg/sql/colexecerror/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,13 @@ func CatchVectorizedRuntimeError(operation func()) (retErr error) {
// Multiple actual packages can have the same prefix as a single constant string
// defined below, but all of such packages are allowed to be caught from.
const (
colPackagesPrefix = "github.com/cockroachdb/cockroach/pkg/col"
encodingPackagePrefix = "github.com/cockroachdb/cockroach/pkg/util/encoding"
execinfraPackagePrefix = "github.com/cockroachdb/cockroach/pkg/sql/execinfra"
sqlColPackagesPrefix = "github.com/cockroachdb/cockroach/pkg/sql/col"
sqlRowPackagesPrefix = "github.com/cockroachdb/cockroach/pkg/sql/row"
sqlSemPackagesPrefix = "github.com/cockroachdb/cockroach/pkg/sql/sem"
colPackagesPrefix = "github.com/cockroachdb/cockroach/pkg/col"
encodingPackagePrefix = "github.com/cockroachdb/cockroach/pkg/util/encoding"
execinfraPackagePrefix = "github.com/cockroachdb/cockroach/pkg/sql/execinfra"
sqlColPackagesPrefix = "github.com/cockroachdb/cockroach/pkg/sql/col"
sqlRowPackagesPrefix = "github.com/cockroachdb/cockroach/pkg/sql/row"
sqlSemPackagesPrefix = "github.com/cockroachdb/cockroach/pkg/sql/sem"
sqlVecindexPackagesPrefix = "github.com/cockroachdb/cockroach/pkg/sql/vecindex"
// When running BenchmarkCatchVectorizedRuntimeError under bazel, the
// repository prefix is missing.
testSqlColPackagesPrefix = "pkg/sql/col"
Expand Down Expand Up @@ -217,6 +218,7 @@ func shouldCatchPanic(panicEmittedFrom string) bool {
strings.HasPrefix(panicEmittedFrom, sqlColPackagesPrefix) ||
strings.HasPrefix(panicEmittedFrom, sqlRowPackagesPrefix) ||
strings.HasPrefix(panicEmittedFrom, sqlSemPackagesPrefix) ||
strings.HasPrefix(panicEmittedFrom, sqlVecindexPackagesPrefix) ||
strings.HasPrefix(panicEmittedFrom, testSqlColPackagesPrefix)
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/rowexec/vector_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ func newVectorSearchProcessor(
rerankMultiplier := int(flowCtx.EvalCtx.SessionData().VectorSearchRerankMultiplier)
v.searcher.Init(flowCtx.EvalCtx,
idx, flowCtx.Txn, &spec.GetFullVectorsFetchSpec, searchBeamSize, maxResults, rerankMultiplier)
if mgr, ok := flowCtx.Cfg.VecIndexManager.(*vecindex.Manager); ok {
v.searcher.SetTestingKnobs(mgr.TestingKnobs())
}
colTypes := make([]*types.T, len(v.fetchSpec.FetchedColumns))
for i, col := range v.fetchSpec.FetchedColumns {
colTypes[i] = col.Type
Expand Down Expand Up @@ -276,6 +279,9 @@ func newVectorMutationSearchProcessor(
return nil, err
}
v.searcher.Init(flowCtx.EvalCtx, idx, flowCtx.Txn, &spec.GetFullVectorsFetchSpec)
if mgr, ok := flowCtx.Cfg.VecIndexManager.(*vecindex.Manager); ok {
v.searcher.SetTestingKnobs(mgr.TestingKnobs())
}

// Pass through the input columns, and add the partition column and optional
// quantized vector column.
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/vecindex/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ go_test(
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/colexecerror",
"//pkg/sql/rowenc",
"//pkg/sql/sem/catid",
"//pkg/sql/sem/eval",
Expand All @@ -93,6 +94,7 @@ go_test(
"//pkg/util/randutil",
"//pkg/util/vector",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
)
9 changes: 9 additions & 0 deletions pkg/sql/vecindex/cspann/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ type IndexOptions struct {
// MaxDeleteAttempts controls the number of times Delete or SearchForDelete
// will retry after failed attempts to find a requested deletion vector.
MaxDeleteAttempts int
// PanicDuringCspannSearch, if non-nil, is invoked at the top of
// Index.Search. Used by tests in the parent vecindex package to inject
// panics that exercise the colexecerror allow-list for this subpackage.
// Always nil in production.
PanicDuringCspannSearch func()
}

// SearchOptions specifies options that apply to a particular search operation
Expand Down Expand Up @@ -490,6 +495,10 @@ func (vi *Index) Search(
searchSet *SearchSet,
options SearchOptions,
) error {
if vi.options.PanicDuringCspannSearch != nil {
vi.options.PanicDuringCspannSearch()
}

vi.setupContext(idxCtx, treeKey, LeafLevel, options)
idxCtx.query.InitOriginal(vi.quantizer.GetDistanceMetric(), vec, &vi.rot)
return vi.searchHelper(ctx, idxCtx, searchSet)
Expand Down
55 changes: 53 additions & 2 deletions pkg/sql/vecindex/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ func (m *Manager) SetTestingKnobs(knobs *VecIndexTestingKnobs) {
m.testingKnobs = knobs
}

// TestingKnobs returns the testing knobs configured on the manager, or nil if
// none have been set. Exposed so that executor processors can propagate the
// knobs to per-flow helpers such as Searcher.
func (m *Manager) TestingKnobs() *VecIndexTestingKnobs {
return m.testingKnobs
}

// Metrics returns a metric.Struct which holds metrics for all vector indexes
// maintained by the manager.
func (m *Manager) Metrics() metric.Struct {
Expand Down Expand Up @@ -121,6 +128,36 @@ func (m *Manager) Get(
e = &indexEntry{mustWait: true, waitCond: sync.Cond{L: &m.mu}}
m.mu.indexes[idxKey] = e

// If the index construction below panics, mustWait will still be true
// because the normal completion path (which sets mustWait=false and calls
// Broadcast) is skipped. Wrap the panic cause into e.err so parked waiters
// get a meaningful error, drop the cache entry so subsequent callers retry
// instead of hanging, then rethrow so colexecerror catches the original
// panic for the calling goroutine as usual.
defer func() {
if !e.mustWait {
return
}

e.mustWait = false

r := recover()
cause, ok := r.(error)
if !ok {
// Covers both nil (Goexit-style exit) and non-error panic values.
cause = errors.Newf("non-error panic value: %v", r)
}
e.err = errors.NewAssertionErrorWithWrappedErrf(
cause, "vector index construction panicked")

e.waitCond.Broadcast()
delete(m.mu.indexes, idxKey)

if r != nil {
panic(r)
}
}()

idx, err := func() (*cspann.Index, error) {
// Unlock while we build the index structure so that concurrent requests can be
// serviced. We've already set mustWait to true, so other requests will wait
Expand Down Expand Up @@ -167,13 +204,13 @@ func (m *Manager) Get(

if err != nil {
// Don't keep the index entry around, so that we retry the query.
m.mu.indexes[idxKey] = nil
delete(m.mu.indexes, idxKey)
}
return idx, err
}

func (m *Manager) getIndexOptions(config *vecpb.Config, readOnly bool) *cspann.IndexOptions {
return &cspann.IndexOptions{
opts := &cspann.IndexOptions{
RotAlgorithm: config.RotAlgorithm,
MinPartitionSize: int(config.MinPartitionSize),
MaxPartitionSize: int(config.MaxPartitionSize),
Expand All @@ -187,6 +224,20 @@ func (m *Manager) getIndexOptions(config *vecpb.Config, readOnly bool) *cspann.I
// Disable adaptive search until it's extended to work with vecstore.
DisableAdaptiveSearch: true,
}
if m.testingKnobs != nil {
// Install a lazy closure rather than copying the function pointer
// directly, because Manager caches *cspann.Index and its IndexOptions
// for the lifetime of the manager. Tests that mutate the knob fields
// after the index is first created (the common pattern in
// TestVectorIndexPanicCaught) would otherwise see the stale nil
// captured at creation time.
opts.PanicDuringCspannSearch = func() {
if panicFn := m.testingKnobs.PanicDuringCspannSearch; panicFn != nil {
panicFn()
}
}
}
return opts
}

func (m *Manager) getVecConfig(
Expand Down
80 changes: 80 additions & 0 deletions pkg/sql/vecindex/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package vecindex_test
import (
"context"
"strconv"
"strings"
"sync"
"testing"
"time"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -240,4 +242,82 @@ func TestVectorManager(t *testing.T) {
}
vectorMgr.SetTestingKnobs(nil)
})

t.Run("test panic during pull recovers cleanly", func(t *testing.T) {
// Block the panicking goroutine until the other 10 callers are parked on
// e.waitCond, so the panic propagates exactly when 10 waiters depend on
// the cleanup defer in Manager.Get.
pullDelayer := sync.WaitGroup{}
pullDelayer.Add(10)
testingKnobs := vecindex.VecIndexTestingKnobs{
DuringVecIndexPull: func() {
pullDelayer.Wait()
panic(errors.AssertionFailedf("injected vecindex panic"))
},
BeforeVecIndexWait: func() {
pullDelayer.Done()
},
}
vectorMgr.SetTestingKnobs(&testingKnobs)

// Table 150 is created in setup but not pulled by any earlier subtest,
// so DuringVecIndexPull will fire (cached entries skip the inner
// closure).
const tableID catid.DescID = 150

// Fire 11 concurrent Get calls. The first to acquire m.mu installs the
// indexEntry and runs the inner closure, which panics. The other 10
// observe e.mustWait=true and block on e.waitCond. The cleanup defer
// in Manager.Get must unblock all 10 and delete the cache entry so a
// subsequent Get retries instead of hanging.
results := make([]error, 11)
wg := sync.WaitGroup{}
for i := range 11 {
wg.Add(1)
go func(idx int) {
defer wg.Done()
defer func() {
if r := recover(); r != nil {
if err, ok := r.(error); ok {
results[idx] = err
} else {
results[idx] = errors.Newf("non-error panic: %v", r)
}
}
}()
_, err := vectorMgr.Get(ctx, tableID, 2)
if err != nil {
results[idx] = err
}
}(i)
}
wg.Wait()

// The panicker propagates the raw injected panic; the 10 waiters get the
// wrapped sentinel, which contains the original cause as well. Check the
// sentinel substring first so a wrapped error isn't miscounted as the
// raw panic.
var injectedCount, wrappedCount int
for _, e := range results {
require.Error(t, e)
require.Contains(t, e.Error(), "injected vecindex panic",
"all errors should carry the original panic cause")
if strings.Contains(e.Error(), "vector index construction panicked") {
wrappedCount++
} else {
injectedCount++
}
}
require.Equal(t, 1, injectedCount,
"exactly one goroutine should propagate the original panic unwrapped")
require.Equal(t, 10, wrappedCount,
"waiters should receive the wrapped sentinel error")

// After cleanup, the cache entry must be gone so a second Get retries.
// Clear the panicking knob first so the retry can succeed.
vectorMgr.SetTestingKnobs(nil)
idx, err := vectorMgr.Get(ctx, tableID, 2)
require.NoError(t, err)
require.NotNil(t, idx)
})
}
17 changes: 17 additions & 0 deletions pkg/sql/vecindex/mutation_searcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type MutationSearcher struct {
partitionKey tree.Datum
encoded tree.Datum
evalCtx *eval.Context
testingKnobs *VecIndexTestingKnobs
}

// Init wraps the given KV transaction in a C-SPANN transaction and initializes
Expand Down Expand Up @@ -61,6 +62,14 @@ func (s *MutationSearcher) KVStats() vecstore.KVStats {
return s.txn.KVStats()
}

// SetTestingKnobs configures testing knobs on the mutation searcher. Invoked
// by executor processors after Init to propagate knobs from the Manager. The
// pointer is nil in production, and a nil pointer is a no-op for subsequent
// knob lookups.
func (s *MutationSearcher) SetTestingKnobs(knobs *VecIndexTestingKnobs) {
s.testingKnobs = knobs
}

// SearchForInsert triggers a search for the partition in which to insert the
// input vector. The partition's key is returned by PartitionKey() and the
// input vector's quantized and encoded form is returned by EncodedVector().
Expand All @@ -70,6 +79,10 @@ func (s *MutationSearcher) KVStats() vecstore.KVStats {
func (s *MutationSearcher) SearchForInsert(
ctx context.Context, prefix roachpb.Key, vec vector.T,
) error {
if s.testingKnobs != nil && s.testingKnobs.PanicDuringMutationSearch != nil {
s.testingKnobs.PanicDuringMutationSearch()
}

res, err := s.idx.SearchForInsert(ctx, &s.idxCtx, cspann.TreeKey(prefix), vec)
if err != nil {
return err
Expand Down Expand Up @@ -101,6 +114,10 @@ func (s *MutationSearcher) SearchForInsert(
func (s *MutationSearcher) SearchForDelete(
ctx context.Context, prefix roachpb.Key, vec vector.T, key cspann.KeyBytes,
) error {
if s.testingKnobs != nil && s.testingKnobs.PanicDuringMutationSearch != nil {
s.testingKnobs.PanicDuringMutationSearch()
}

res, err := s.idx.SearchForDelete(ctx, &s.idxCtx, cspann.TreeKey(prefix), vec, key)
if err != nil {
return err
Expand Down
29 changes: 21 additions & 8 deletions pkg/sql/vecindex/searcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ import (
// NOTE: Searcher is intended to be embedded within an execution engine
// processor object.
type Searcher struct {
idx *cspann.Index
txn vecstore.Txn
idxCtx cspann.Context
options cspann.SearchOptions
searchSet cspann.SearchSet
results cspann.SearchResults
resultIdx int
evalCtx *eval.Context
idx *cspann.Index
txn vecstore.Txn
idxCtx cspann.Context
options cspann.SearchOptions
searchSet cspann.SearchSet
results cspann.SearchResults
resultIdx int
evalCtx *eval.Context
testingKnobs *VecIndexTestingKnobs
}

// Init wraps the given KV transaction in a C-SPANN transaction and initializes
Expand Down Expand Up @@ -65,13 +66,25 @@ func (s *Searcher) Init(
cspann.IncreaseRerankResults(baseBeamSize, maxResults, rerankMultiplier)
}

// SetTestingKnobs configures testing knobs on the searcher. Invoked by
// executor processors after Init to propagate knobs from the Manager. The
// pointer is nil in production, and a nil pointer is a no-op for subsequent
// knob lookups.
func (s *Searcher) SetTestingKnobs(knobs *VecIndexTestingKnobs) {
s.testingKnobs = knobs
}

// Search triggers a search over the index for the given vector, within the
// scope of the given prefix. "maxResults" specifies the maximum number of
// results that will be returned.
//
// NOTE: The caller is assumed to own the memory for all parameters and can
// reuse the memory after the call returns.
func (s *Searcher) Search(ctx context.Context, prefix roachpb.Key, vec vector.T) error {
if s.testingKnobs != nil && s.testingKnobs.PanicDuringSearch != nil {
s.testingKnobs.PanicDuringSearch()
}

err := s.idx.Search(ctx, &s.idxCtx, cspann.TreeKey(prefix), vec, &s.searchSet, s.options)
if err != nil {
return err
Expand Down
Loading
Loading