Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 127 additions & 24 deletions helix-db/src/helix_engine/vector_core/vector_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
utils::{id::uuid_str, properties::ImmutablePropertiesMap},
};
use heed3::{
Database, Env, RoTxn, RwTxn,
Database, Env, Error as HeedError, MdbError, PutFlags, RoTxn, RwTxn,
byteorder::BE,
types::{Bytes, U128, Unit},
};
Expand Down Expand Up @@ -144,21 +144,52 @@ impl VectorCore {
Ok(())
}

/// Atomically set the entry point only if it doesn't already exist.
/// Returns Ok(true) if we set it, Ok(false) if it already existed.
/// This prevents the race condition where multiple concurrent transactions
/// try to set the entry point simultaneously.
#[inline]
fn set_entry_point_if_not_exists(
&self,
txn: &mut RwTxn,
entry: &HVector,
) -> Result<bool, VectorError> {
match self.vectors_db.put_with_flags(
txn,
PutFlags::NO_OVERWRITE,
ENTRY_POINT_KEY,
&entry.id.to_be_bytes(),
) {
Ok(_) => Ok(true),
Err(HeedError::Mdb(MdbError::KeyExist)) => Ok(false),
Err(e) => Err(VectorError::from(e)),
}
}

/// Put a vector into the database. This stores both the vector data (embeddings)
/// and the vector properties (metadata) within the same transaction.
///
/// Note: We serialize all data BEFORE doing any writes to ensure consistency.
/// If serialization fails, no partial writes occur.
#[inline(always)]
pub fn put_vector<'arena>(
&self,
txn: &mut RwTxn,
vector: &HVector<'arena>,
) -> Result<(), VectorError> {
// Serialize all data upfront BEFORE any writes.
// This ensures we don't do partial writes if serialization fails.
let vector_key = Self::vector_key(vector.id, vector.level);
let vector_data_bytes = vector.vector_data_to_bytes()?;
let properties_bytes = bincode::serialize(&vector)?;

// Now do both writes - if either fails, LMDB will roll back the transaction
self.vectors_db
.put(
txn,
&Self::vector_key(vector.id, vector.level),
vector.vector_data_to_bytes()?,
)
.put(txn, &vector_key, &vector_data_bytes)
.map_err(VectorError::from)?;
self.vector_properties_db
.put(txn, &vector.id, bincode::serialize(&vector)?.as_ref())?;
.put(txn, &vector.id, &properties_bytes)
.map_err(VectorError::from)?;
Ok(())
}

Expand Down Expand Up @@ -406,20 +437,34 @@ impl VectorCore {
Ok(vector)
}

/// Get a full vector (data + properties) from the database.
/// Handles potential inconsistency where one database has data but the other doesn't.
#[inline(always)]
pub fn get_full_vector<'arena>(
&self,
txn: &RoTxn,
id: u128,
arena: &'arena bumpalo::Bump,
) -> Result<HVector<'arena>, VectorError> {
let vector_data_bytes = self
.vectors_db
.get(txn, &Self::vector_key(id, 0))?
.ok_or(VectorError::VectorNotFound(uuid_str(id, arena).to_string()))?;
let vector_data_bytes = match self.vectors_db.get(txn, &Self::vector_key(id, 0))? {
Some(bytes) => bytes,
None => {
return Err(VectorError::VectorNotFound(uuid_str(id, arena).to_string()));
}
};

let properties_bytes = self.vector_properties_db.get(txn, &id)?;

// Defensive check: if we have vector data but no properties, the vector
// may be in an inconsistent state. Return not found rather than panicking.
if properties_bytes.is_none() && !vector_data_bytes.is_empty() {
debug_println!(
"Warning: vector {} has data but no properties (inconsistent state)",
uuid_str(id, arena)
);
return Err(VectorError::VectorNotFound(uuid_str(id, arena).to_string()));
}

let vector = HVector::from_bincode_bytes(arena, properties_bytes, vector_data_bytes, id)?;
if vector.deleted {
return Err(VectorError::VectorDeleted);
Expand Down Expand Up @@ -580,14 +625,21 @@ impl HNSW for VectorCore {

query.level = new_level;

// Use atomic set-if-not-exists to prevent race condition where multiple
// concurrent transactions try to set the entry point simultaneously.
// This fixes "double free or corruption" crashes under high concurrency.
let entry_point = match self.get_entry_point(txn, label, arena) {
Ok(ep) => ep,
Err(_) => {
// TODO: use proper error handling
self.set_entry_point(txn, &query)?;
query.set_distance(0.0);

return Ok(query);
// Atomically try to set entry point - only one transaction will succeed
if self.set_entry_point_if_not_exists(txn, &query)? {
// We won the race, we're the first vector
query.set_distance(0.0);
return Ok(query);
} else {
// Another transaction set the entry point first, re-read it
self.get_entry_point(txn, label, arena)?
}
}
};

Expand Down Expand Up @@ -620,20 +672,71 @@ impl HNSW for VectorCore {
self.select_neighbors::<F>(txn, label, &query, nearest, level, true, None, arena)?;
self.set_neighbours(txn, query.id, &neighbors, level)?;

// Update neighbors' connection lists to include the new vector.
// Use defensive error handling - if a neighbor was deleted/modified by another
// transaction, skip it rather than failing the entire insert.
for e in neighbors {
let id = e.id;
let e_conns = BinaryHeap::from(
arena,
self.get_neighbors::<F>(txn, label, id, level, None, arena)?,
);
let e_new_conn = self
.select_neighbors::<F>(txn, label, &query, e_conns, level, true, None, arena)?;
self.set_neighbours(txn, id, &e_new_conn, level)?;

// Try to get the neighbor's current connections - if this fails (e.g., neighbor
// was deleted by another transaction), skip updating this neighbor.
let e_conns = match self.get_neighbors::<F>(txn, label, id, level, None, arena) {
Ok(conns) => BinaryHeap::from(arena, conns),
Err(VectorError::VectorNotFound(_)) | Err(VectorError::VectorDeleted) => {
continue;
}
Err(VectorError::EntryPointNotFound) => {
continue;
}
Err(e) => return Err(e),
};

let e_new_conn = match self.select_neighbors::<F>(
txn, label, &query, e_conns, level, true, None, arena,
) {
Ok(conns) => conns,
Err(VectorError::VectorNotFound(_)) | Err(VectorError::VectorDeleted) => {
continue;
}
Err(e) => return Err(e),
};

// Update the neighbor's connections - if this fails due to concurrent modification,
// it's not critical (the graph will be slightly suboptimal but still functional)
if let Err(_e) = self.set_neighbours(txn, id, &e_new_conn, level) {
debug_println!("Warning: failed to update neighbor {}: {:?}", id, _e);
}
}
}

// Update entry point if our level is higher than what we observed at the start.
//
// Note: There is an inherent race condition here that cannot be fully resolved
// without atomic compare-and-swap semantics (which LMDB doesn't support for
// arbitrary values). Between our check and commit, another transaction may have
// set a higher-level entry point. In this rare case, our lower-level entry point
// will overwrite it. This is acceptable because:
// 1. High-level vectors are exponentially rare (~1/m probability per level)
// 2. The impact is only slightly slower search (more levels to traverse)
// 3. Search correctness is not affected
// 4. The next high-level insert will fix it
if new_level > l {
self.set_entry_point(txn, &query)?;
match self.get_entry_point(txn, label, arena) {
Ok(current_ep) => {
if new_level > current_ep.level {
self.set_entry_point(txn, &query)?;
}
}
Err(VectorError::EntryPointNotFound) => {
// Entry point was deleted (unusual), set ours
self.set_entry_point(txn, &query)?;
}
Err(_) => {
// I/O or other error reading entry point - don't risk overwriting
// a valid higher-level entry point. Skip this update; the next
// high-level insert will retry.
}
Comment on lines +734 to +738
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unconditionally setting entry point on any error (line 725) can overwrite a valid entry point if there's a temporary read error. Only set on EntryPointNotFound (line 722), not on other errors like database I/O failures.

}
}
Comment on lines 723 to 740
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The entry point update logic has a race condition. Between reading current_ep on line 716 and setting on line 718, another transaction could set an even higher-level entry point. The check-then-set is not atomic. Consider using NO_OVERWRITE with entry point level encoding in the key, or accepting that the highest concurrent insert wins (which may be acceptable).


debug_println!("vector inserted with id {}", query.id);
Expand Down
Loading