Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 7 additions & 14 deletions tok/hnsw/persistent_hnsw.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,8 +593,6 @@ func (ph *persistentHNSW[T]) insertHelper(ctx context.Context, tc *TxnCache,
// var nns []persistentHeapElement[T]
visited := []persistentHeapElement[T]{}
inLevel := getInsertLayer(ph.maxLevels) // calculate layer to insert node at (randomized every time)
var layerErr error

for level := range inLevel {
// perform insertion for layers [level, max_level) only, when level < inLevel just find better start
err := ph.getVecFromUid(entry, tc, &startVec)
Expand All @@ -617,7 +615,6 @@ func (ph *persistentHNSW[T]) insertHelper(ctx context.Context, tc *TxnCache,

var outboundEdgesAllLayers = make([][]uint64, ph.maxLevels)
var inboundEdgesAllLayersMap = make(map[uint64][][]uint64)
nnUidArray := []uint64{}
for level := inLevel; level < ph.maxLevels; level++ {
err := ph.getVecFromUid(entry, tc, &startVec)
if err != nil {
Expand All @@ -626,38 +623,34 @@ func (ph *persistentHNSW[T]) insertHelper(ctx context.Context, tc *TxnCache,
layerResult, err := ph.searchPersistentLayer(tc, level, entry, startVec,
inVec, false, ph.efConstruction, index.AcceptAll[T])
if err != nil {
return []persistentHeapElement[T]{}, []*index.KeyValue{}, layerErr
return []persistentHeapElement[T]{}, []*index.KeyValue{}, err
}

entry = layerResult.bestNeighbor().index

nns := layerResult.neighbors
for i := range nns {
nnUidArray = append(nnUidArray, nns[i].index)
if inboundEdgesAllLayersMap[nns[i].index] == nil {
inboundEdgesAllLayersMap[nns[i].index] = make([][]uint64, ph.maxLevels)
}
inboundEdgesAllLayersMap[nns[i].index][level] =
append(inboundEdgesAllLayersMap[nns[i].index][level], inUuid)
// add nn to outboundEdges.
// These should already be correctly ordered.
outboundEdgesAllLayers[level] =
append(outboundEdgesAllLayers[level], nns[i].index)
}
}
edge, err := ph.addNeighbors(ctx, tc, inUuid, outboundEdgesAllLayers)
for i := range nnUidArray {
edge, err := ph.addNeighbors(
ctx, tc, nnUidArray[i], inboundEdgesAllLayersMap[nnUidArray[i]])
if err != nil {
return []persistentHeapElement[T]{}, []*index.KeyValue{}, err
}
edges = append(edges, edge)
for nnUid, inboundEdges := range inboundEdgesAllLayersMap {
edge, err := ph.addNeighbors(ctx, tc, nnUid, inboundEdges)
if err != nil {
return []persistentHeapElement[T]{}, []*index.KeyValue{}, err
}
edges = append(edges, edge)
}
if err != nil {
return []persistentHeapElement[T]{}, []*index.KeyValue{}, err
}
edges = append(edges, edge)

return visited, edges, nil
}
81 changes: 81 additions & 0 deletions tok/hnsw/persistent_hnsw_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,87 @@ func TestNonflatEntryInsertToPersistentFlatStorage(t *testing.T) {
}
}

// TestInsertHelper_NoDuplicateNeighborEdges verifies that when the same neighbor
// UID appears across multiple HNSW layers, insertHelper only produces one
// addNeighbors call per unique neighbor UID (not one per layer appearance).
//
// Before the fix (sp/dedup-neighbor-updates), insertHelper accumulated neighbor
// UIDs into nnUidArray with duplicates (same UID once per layer it appeared in),
// causing redundant addNeighbors calls and wasted writes. The fix uses
// inboundEdgesAllLayersMap (keyed by UID) to deduplicate.
func TestInsertHelper_NoDuplicateNeighborEdges(t *testing.T) {
emptyTsDbs()
flatPh := flatPhs[0]
flatPh.nodeAllEdges = make(map[uint64][][]uint64)

// Insert the entry node first.
entryInsert := flatEntryInsert
key := DataKey(flatPh.pred, entryInsert.inUuid)
for i := range tsDbs {
tsDbs[i].inMemTestDb[string(key[:])] = floatArrayAsBytes(entryInsert.inVec)
}
_, err := flatPh.Insert(context.TODO(), entryInsert.tc, entryInsert.inUuid, entryInsert.inVec)
if err != nil {
t.Fatalf("Failed to insert entry node: %v", err)
}

// Insert a second node so the entry node becomes a neighbor.
insert2 := insertToPersistentFlatStorageTest{
tc: NewTxnCache(&inMemTxn{startTs: 12, commitTs: 40}, 12),
inUuid: uint64(123),
inVec: []float64{0.824, 0.319, 0.111},
}
key2 := DataKey(flatPh.pred, insert2.inUuid)
for i := range tsDbs {
tsDbs[i].inMemTestDb[string(key2[:])] = floatArrayAsBytes(insert2.inVec)
}
edges2, err := flatPh.Insert(context.TODO(), insert2.tc, insert2.inUuid, insert2.inVec)
if err != nil {
t.Fatalf("Failed to insert second node: %v", err)
}

// Now insert a third node that should have the previous two as neighbors.
insert3 := insertToPersistentFlatStorageTest{
tc: NewTxnCache(&inMemTxn{startTs: 50, commitTs: 60}, 50),
inUuid: uint64(456),
inVec: []float64{0.5, 0.3, 0.2},
}
key3 := DataKey(flatPh.pred, insert3.inUuid)
for i := range tsDbs {
tsDbs[i].inMemTestDb[string(key3[:])] = floatArrayAsBytes(insert3.inVec)
}
edges3, err := flatPh.Insert(context.TODO(), insert3.tc, insert3.inUuid, insert3.inVec)
if err != nil {
t.Fatalf("Failed to insert third node: %v", err)
}

// Count how many times each entity UID appears in the edges list.
// With the dedup fix, each neighbor UID should appear at most once
// (plus the inserted node itself appears once).
edgeEntityCounts := make(map[uint64]int)
for _, edge := range edges3 {
edgeEntityCounts[edge.Entity]++
}

for uid, count := range edgeEntityCounts {
if count > 1 {
t.Errorf("Entity UID %d appeared %d times in edges (expected at most 1). "+
"This indicates duplicate addNeighbors calls for the same neighbor.", uid, count)
}
}

// Also verify edges2 has no duplicates for the same reason.
edgeEntityCounts2 := make(map[uint64]int)
for _, edge := range edges2 {
edgeEntityCounts2[edge.Entity]++
}
for uid, count := range edgeEntityCounts2 {
if count > 1 {
t.Errorf("(insert2) Entity UID %d appeared %d times in edges (expected at most 1).", uid, count)
}
}
}

type searchPersistentFlatStorageTest struct {
qc *QueryCache
query []float64
Expand Down