Skip to content

Commit 92c1d8d

Browse files
perf(hnsw): deduplicate neighbor updates and fix error handling (#9664)
## Summary - **Deduplicate `addNeighbors` calls**: Replace `nnUidArray` (which accumulated duplicate UIDs when the same neighbor appeared across multiple HNSW layers) with direct iteration over `inboundEdgesAllLayersMap` keys, ensuring each neighbor is processed exactly once - **Fix error handling for `addNeighbors(inUuid, ...)`**: The error was previously checked *after* the neighbor loop instead of immediately, allowing unnecessary work on failure - **Fix silent error swallowing**: `searchPersistentLayer` errors were returned as `layerErr` (always `nil`) instead of `err`, masking real failures ## Test plan - [ ] Verify existing HNSW vector index tests pass (`go test ./tok/hnsw/...`) - [ ] Confirm `addNeighbors` is called once per unique neighbor UID (no redundant writes) - [ ] Validate error propagation from `searchPersistentLayer` is no longer swallowed 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 1b40e85 commit 92c1d8d

2 files changed

Lines changed: 88 additions & 14 deletions

File tree

tok/hnsw/persistent_hnsw.go

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -593,8 +593,6 @@ func (ph *persistentHNSW[T]) insertHelper(ctx context.Context, tc *TxnCache,
593593
// var nns []persistentHeapElement[T]
594594
visited := []persistentHeapElement[T]{}
595595
inLevel := getInsertLayer(ph.maxLevels) // calculate layer to insert node at (randomized every time)
596-
var layerErr error
597-
598596
for level := range inLevel {
599597
// perform insertion for layers [level, max_level) only, when level < inLevel just find better start
600598
err := ph.getVecFromUid(entry, tc, &startVec)
@@ -617,7 +615,6 @@ func (ph *persistentHNSW[T]) insertHelper(ctx context.Context, tc *TxnCache,
617615

618616
var outboundEdgesAllLayers = make([][]uint64, ph.maxLevels)
619617
var inboundEdgesAllLayersMap = make(map[uint64][][]uint64)
620-
nnUidArray := []uint64{}
621618
for level := inLevel; level < ph.maxLevels; level++ {
622619
err := ph.getVecFromUid(entry, tc, &startVec)
623620
if err != nil {
@@ -626,38 +623,34 @@ func (ph *persistentHNSW[T]) insertHelper(ctx context.Context, tc *TxnCache,
626623
layerResult, err := ph.searchPersistentLayer(tc, level, entry, startVec,
627624
inVec, false, ph.efConstruction, index.AcceptAll[T])
628625
if err != nil {
629-
return []persistentHeapElement[T]{}, []*index.KeyValue{}, layerErr
626+
return []persistentHeapElement[T]{}, []*index.KeyValue{}, err
630627
}
631628

632629
entry = layerResult.bestNeighbor().index
633630

634631
nns := layerResult.neighbors
635632
for i := range nns {
636-
nnUidArray = append(nnUidArray, nns[i].index)
637633
if inboundEdgesAllLayersMap[nns[i].index] == nil {
638634
inboundEdgesAllLayersMap[nns[i].index] = make([][]uint64, ph.maxLevels)
639635
}
640636
inboundEdgesAllLayersMap[nns[i].index][level] =
641637
append(inboundEdgesAllLayersMap[nns[i].index][level], inUuid)
642-
// add nn to outboundEdges.
643-
// These should already be correctly ordered.
644638
outboundEdgesAllLayers[level] =
645639
append(outboundEdgesAllLayers[level], nns[i].index)
646640
}
647641
}
648642
edge, err := ph.addNeighbors(ctx, tc, inUuid, outboundEdgesAllLayers)
649-
for i := range nnUidArray {
650-
edge, err := ph.addNeighbors(
651-
ctx, tc, nnUidArray[i], inboundEdgesAllLayersMap[nnUidArray[i]])
643+
if err != nil {
644+
return []persistentHeapElement[T]{}, []*index.KeyValue{}, err
645+
}
646+
edges = append(edges, edge)
647+
for nnUid, inboundEdges := range inboundEdgesAllLayersMap {
648+
edge, err := ph.addNeighbors(ctx, tc, nnUid, inboundEdges)
652649
if err != nil {
653650
return []persistentHeapElement[T]{}, []*index.KeyValue{}, err
654651
}
655652
edges = append(edges, edge)
656653
}
657-
if err != nil {
658-
return []persistentHeapElement[T]{}, []*index.KeyValue{}, err
659-
}
660-
edges = append(edges, edge)
661654

662655
return visited, edges, nil
663656
}

tok/hnsw/persistent_hnsw_test.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,87 @@ func TestNonflatEntryInsertToPersistentFlatStorage(t *testing.T) {
453453
}
454454
}
455455

456+
// TestInsertHelper_NoDuplicateNeighborEdges verifies that when the same neighbor
457+
// UID appears across multiple HNSW layers, insertHelper only produces one
458+
// addNeighbors call per unique neighbor UID (not one per layer appearance).
459+
//
460+
// Before the fix (sp/dedup-neighbor-updates), insertHelper accumulated neighbor
461+
// UIDs into nnUidArray with duplicates (same UID once per layer it appeared in),
462+
// causing redundant addNeighbors calls and wasted writes. The fix uses
463+
// inboundEdgesAllLayersMap (keyed by UID) to deduplicate.
464+
func TestInsertHelper_NoDuplicateNeighborEdges(t *testing.T) {
465+
emptyTsDbs()
466+
flatPh := flatPhs[0]
467+
flatPh.nodeAllEdges = make(map[uint64][][]uint64)
468+
469+
// Insert the entry node first.
470+
entryInsert := flatEntryInsert
471+
key := DataKey(flatPh.pred, entryInsert.inUuid)
472+
for i := range tsDbs {
473+
tsDbs[i].inMemTestDb[string(key[:])] = floatArrayAsBytes(entryInsert.inVec)
474+
}
475+
_, err := flatPh.Insert(context.TODO(), entryInsert.tc, entryInsert.inUuid, entryInsert.inVec)
476+
if err != nil {
477+
t.Fatalf("Failed to insert entry node: %v", err)
478+
}
479+
480+
// Insert a second node so the entry node becomes a neighbor.
481+
insert2 := insertToPersistentFlatStorageTest{
482+
tc: NewTxnCache(&inMemTxn{startTs: 12, commitTs: 40}, 12),
483+
inUuid: uint64(123),
484+
inVec: []float64{0.824, 0.319, 0.111},
485+
}
486+
key2 := DataKey(flatPh.pred, insert2.inUuid)
487+
for i := range tsDbs {
488+
tsDbs[i].inMemTestDb[string(key2[:])] = floatArrayAsBytes(insert2.inVec)
489+
}
490+
edges2, err := flatPh.Insert(context.TODO(), insert2.tc, insert2.inUuid, insert2.inVec)
491+
if err != nil {
492+
t.Fatalf("Failed to insert second node: %v", err)
493+
}
494+
495+
// Now insert a third node that should have the previous two as neighbors.
496+
insert3 := insertToPersistentFlatStorageTest{
497+
tc: NewTxnCache(&inMemTxn{startTs: 50, commitTs: 60}, 50),
498+
inUuid: uint64(456),
499+
inVec: []float64{0.5, 0.3, 0.2},
500+
}
501+
key3 := DataKey(flatPh.pred, insert3.inUuid)
502+
for i := range tsDbs {
503+
tsDbs[i].inMemTestDb[string(key3[:])] = floatArrayAsBytes(insert3.inVec)
504+
}
505+
edges3, err := flatPh.Insert(context.TODO(), insert3.tc, insert3.inUuid, insert3.inVec)
506+
if err != nil {
507+
t.Fatalf("Failed to insert third node: %v", err)
508+
}
509+
510+
// Count how many times each entity UID appears in the edges list.
511+
// With the dedup fix, each neighbor UID should appear at most once
512+
// (plus the inserted node itself appears once).
513+
edgeEntityCounts := make(map[uint64]int)
514+
for _, edge := range edges3 {
515+
edgeEntityCounts[edge.Entity]++
516+
}
517+
518+
for uid, count := range edgeEntityCounts {
519+
if count > 1 {
520+
t.Errorf("Entity UID %d appeared %d times in edges (expected at most 1). "+
521+
"This indicates duplicate addNeighbors calls for the same neighbor.", uid, count)
522+
}
523+
}
524+
525+
// Also verify edges2 has no duplicates for the same reason.
526+
edgeEntityCounts2 := make(map[uint64]int)
527+
for _, edge := range edges2 {
528+
edgeEntityCounts2[edge.Entity]++
529+
}
530+
for uid, count := range edgeEntityCounts2 {
531+
if count > 1 {
532+
t.Errorf("(insert2) Entity UID %d appeared %d times in edges (expected at most 1).", uid, count)
533+
}
534+
}
535+
}
536+
456537
type searchPersistentFlatStorageTest struct {
457538
qc *QueryCache
458539
query []float64

0 commit comments

Comments
 (0)