diff --git a/tok/hnsw/persistent_hnsw.go b/tok/hnsw/persistent_hnsw.go index 6ebc876a86b..5658800e579 100644 --- a/tok/hnsw/persistent_hnsw.go +++ b/tok/hnsw/persistent_hnsw.go @@ -593,8 +593,6 @@ func (ph *persistentHNSW[T]) insertHelper(ctx context.Context, tc *TxnCache, // var nns []persistentHeapElement[T] visited := []persistentHeapElement[T]{} inLevel := getInsertLayer(ph.maxLevels) // calculate layer to insert node at (randomized every time) - var layerErr error - for level := range inLevel { // perform insertion for layers [level, max_level) only, when level < inLevel just find better start err := ph.getVecFromUid(entry, tc, &startVec) @@ -617,7 +615,6 @@ func (ph *persistentHNSW[T]) insertHelper(ctx context.Context, tc *TxnCache, var outboundEdgesAllLayers = make([][]uint64, ph.maxLevels) var inboundEdgesAllLayersMap = make(map[uint64][][]uint64) - nnUidArray := []uint64{} for level := inLevel; level < ph.maxLevels; level++ { err := ph.getVecFromUid(entry, tc, &startVec) if err != nil { @@ -626,38 +623,34 @@ func (ph *persistentHNSW[T]) insertHelper(ctx context.Context, tc *TxnCache, layerResult, err := ph.searchPersistentLayer(tc, level, entry, startVec, inVec, false, ph.efConstruction, index.AcceptAll[T]) if err != nil { - return []persistentHeapElement[T]{}, []*index.KeyValue{}, layerErr + return []persistentHeapElement[T]{}, []*index.KeyValue{}, err } entry = layerResult.bestNeighbor().index nns := layerResult.neighbors for i := range nns { - nnUidArray = append(nnUidArray, nns[i].index) if inboundEdgesAllLayersMap[nns[i].index] == nil { inboundEdgesAllLayersMap[nns[i].index] = make([][]uint64, ph.maxLevels) } inboundEdgesAllLayersMap[nns[i].index][level] = append(inboundEdgesAllLayersMap[nns[i].index][level], inUuid) - // add nn to outboundEdges. - // These should already be correctly ordered. outboundEdgesAllLayers[level] = append(outboundEdgesAllLayers[level], nns[i].index) } } edge, err := ph.addNeighbors(ctx, tc, inUuid, outboundEdgesAllLayers) - for i := range nnUidArray { - edge, err := ph.addNeighbors( - ctx, tc, nnUidArray[i], inboundEdgesAllLayersMap[nnUidArray[i]]) + if err != nil { + return []persistentHeapElement[T]{}, []*index.KeyValue{}, err + } + edges = append(edges, edge) + for nnUid, inboundEdges := range inboundEdgesAllLayersMap { + edge, err := ph.addNeighbors(ctx, tc, nnUid, inboundEdges) if err != nil { return []persistentHeapElement[T]{}, []*index.KeyValue{}, err } edges = append(edges, edge) } - if err != nil { - return []persistentHeapElement[T]{}, []*index.KeyValue{}, err - } - edges = append(edges, edge) return visited, edges, nil } diff --git a/tok/hnsw/persistent_hnsw_test.go b/tok/hnsw/persistent_hnsw_test.go index e1ca82a291c..2a27ac99e7c 100644 --- a/tok/hnsw/persistent_hnsw_test.go +++ b/tok/hnsw/persistent_hnsw_test.go @@ -453,6 +453,87 @@ func TestNonflatEntryInsertToPersistentFlatStorage(t *testing.T) { } } +// TestInsertHelper_NoDuplicateNeighborEdges verifies that when the same neighbor +// UID appears across multiple HNSW layers, insertHelper only produces one +// addNeighbors call per unique neighbor UID (not one per layer appearance). +// +// Before the fix (sp/dedup-neighbor-updates), insertHelper accumulated neighbor +// UIDs into nnUidArray with duplicates (same UID once per layer it appeared in), +// causing redundant addNeighbors calls and wasted writes. The fix uses +// inboundEdgesAllLayersMap (keyed by UID) to deduplicate. +func TestInsertHelper_NoDuplicateNeighborEdges(t *testing.T) { + emptyTsDbs() + flatPh := flatPhs[0] + flatPh.nodeAllEdges = make(map[uint64][][]uint64) + + // Insert the entry node first. + entryInsert := flatEntryInsert + key := DataKey(flatPh.pred, entryInsert.inUuid) + for i := range tsDbs { + tsDbs[i].inMemTestDb[string(key[:])] = floatArrayAsBytes(entryInsert.inVec) + } + _, err := flatPh.Insert(context.TODO(), entryInsert.tc, entryInsert.inUuid, entryInsert.inVec) + if err != nil { + t.Fatalf("Failed to insert entry node: %v", err) + } + + // Insert a second node so the entry node becomes a neighbor. + insert2 := insertToPersistentFlatStorageTest{ + tc: NewTxnCache(&inMemTxn{startTs: 12, commitTs: 40}, 12), + inUuid: uint64(123), + inVec: []float64{0.824, 0.319, 0.111}, + } + key2 := DataKey(flatPh.pred, insert2.inUuid) + for i := range tsDbs { + tsDbs[i].inMemTestDb[string(key2[:])] = floatArrayAsBytes(insert2.inVec) + } + edges2, err := flatPh.Insert(context.TODO(), insert2.tc, insert2.inUuid, insert2.inVec) + if err != nil { + t.Fatalf("Failed to insert second node: %v", err) + } + + // Now insert a third node that should have the previous two as neighbors. + insert3 := insertToPersistentFlatStorageTest{ + tc: NewTxnCache(&inMemTxn{startTs: 50, commitTs: 60}, 50), + inUuid: uint64(456), + inVec: []float64{0.5, 0.3, 0.2}, + } + key3 := DataKey(flatPh.pred, insert3.inUuid) + for i := range tsDbs { + tsDbs[i].inMemTestDb[string(key3[:])] = floatArrayAsBytes(insert3.inVec) + } + edges3, err := flatPh.Insert(context.TODO(), insert3.tc, insert3.inUuid, insert3.inVec) + if err != nil { + t.Fatalf("Failed to insert third node: %v", err) + } + + // Count how many times each entity UID appears in the edges list. + // With the dedup fix, each neighbor UID should appear at most once + // (plus the inserted node itself appears once). + edgeEntityCounts := make(map[uint64]int) + for _, edge := range edges3 { + edgeEntityCounts[edge.Entity]++ + } + + for uid, count := range edgeEntityCounts { + if count > 1 { + t.Errorf("Entity UID %d appeared %d times in edges (expected at most 1). "+ + "This indicates duplicate addNeighbors calls for the same neighbor.", uid, count) + } + } + + // Also verify edges2 has no duplicates for the same reason. + edgeEntityCounts2 := make(map[uint64]int) + for _, edge := range edges2 { + edgeEntityCounts2[edge.Entity]++ + } + for uid, count := range edgeEntityCounts2 { + if count > 1 { + t.Errorf("(insert2) Entity UID %d appeared %d times in edges (expected at most 1).", uid, count) + } + } +} + type searchPersistentFlatStorageTest struct { qc *QueryCache query []float64