diff --git a/helix-db/src/helix_engine/vector_core/vector_core.rs b/helix-db/src/helix_engine/vector_core/vector_core.rs index e463feea..4f0b97d1 100644 --- a/helix-db/src/helix_engine/vector_core/vector_core.rs +++ b/helix-db/src/helix_engine/vector_core/vector_core.rs @@ -13,7 +13,7 @@ use crate::{ utils::{id::uuid_str, properties::ImmutablePropertiesMap}, }; use heed3::{ - Database, Env, RoTxn, RwTxn, + Database, Env, Error as HeedError, MdbError, PutFlags, RoTxn, RwTxn, byteorder::BE, types::{Bytes, U128, Unit}, }; @@ -144,21 +144,52 @@ impl VectorCore { Ok(()) } + /// Atomically set the entry point only if it doesn't already exist. + /// Returns Ok(true) if we set it, Ok(false) if it already existed. + /// This prevents the race condition where multiple concurrent transactions + /// try to set the entry point simultaneously. + #[inline] + fn set_entry_point_if_not_exists( + &self, + txn: &mut RwTxn, + entry: &HVector, + ) -> Result { + match self.vectors_db.put_with_flags( + txn, + PutFlags::NO_OVERWRITE, + ENTRY_POINT_KEY, + &entry.id.to_be_bytes(), + ) { + Ok(_) => Ok(true), + Err(HeedError::Mdb(MdbError::KeyExist)) => Ok(false), + Err(e) => Err(VectorError::from(e)), + } + } + + /// Put a vector into the database. This stores both the vector data (embeddings) + /// and the vector properties (metadata) within the same transaction. + /// + /// Note: We serialize all data BEFORE doing any writes to ensure consistency. + /// If serialization fails, no partial writes occur. #[inline(always)] pub fn put_vector<'arena>( &self, txn: &mut RwTxn, vector: &HVector<'arena>, ) -> Result<(), VectorError> { + // Serialize all data upfront BEFORE any writes. + // This ensures we don't do partial writes if serialization fails. + let vector_key = Self::vector_key(vector.id, vector.level); + let vector_data_bytes = vector.vector_data_to_bytes()?; + let properties_bytes = bincode::serialize(&vector)?; + + // Now do both writes - if either fails, LMDB will roll back the transaction self.vectors_db - .put( - txn, - &Self::vector_key(vector.id, vector.level), - vector.vector_data_to_bytes()?, - ) + .put(txn, &vector_key, &vector_data_bytes) .map_err(VectorError::from)?; self.vector_properties_db - .put(txn, &vector.id, bincode::serialize(&vector)?.as_ref())?; + .put(txn, &vector.id, &properties_bytes) + .map_err(VectorError::from)?; Ok(()) } @@ -406,6 +437,8 @@ impl VectorCore { Ok(vector) } + /// Get a full vector (data + properties) from the database. + /// Handles potential inconsistency where one database has data but the other doesn't. #[inline(always)] pub fn get_full_vector<'arena>( &self, @@ -413,13 +446,25 @@ impl VectorCore { id: u128, arena: &'arena bumpalo::Bump, ) -> Result, VectorError> { - let vector_data_bytes = self - .vectors_db - .get(txn, &Self::vector_key(id, 0))? - .ok_or(VectorError::VectorNotFound(uuid_str(id, arena).to_string()))?; + let vector_data_bytes = match self.vectors_db.get(txn, &Self::vector_key(id, 0))? { + Some(bytes) => bytes, + None => { + return Err(VectorError::VectorNotFound(uuid_str(id, arena).to_string())); + } + }; let properties_bytes = self.vector_properties_db.get(txn, &id)?; + // Defensive check: if we have vector data but no properties, the vector + // may be in an inconsistent state. Return not found rather than panicking. + if properties_bytes.is_none() && !vector_data_bytes.is_empty() { + debug_println!( + "Warning: vector {} has data but no properties (inconsistent state)", + uuid_str(id, arena) + ); + return Err(VectorError::VectorNotFound(uuid_str(id, arena).to_string())); + } + let vector = HVector::from_bincode_bytes(arena, properties_bytes, vector_data_bytes, id)?; if vector.deleted { return Err(VectorError::VectorDeleted); @@ -580,14 +625,21 @@ impl HNSW for VectorCore { query.level = new_level; + // Use atomic set-if-not-exists to prevent race condition where multiple + // concurrent transactions try to set the entry point simultaneously. + // This fixes "double free or corruption" crashes under high concurrency. let entry_point = match self.get_entry_point(txn, label, arena) { Ok(ep) => ep, Err(_) => { - // TODO: use proper error handling - self.set_entry_point(txn, &query)?; - query.set_distance(0.0); - - return Ok(query); + // Atomically try to set entry point - only one transaction will succeed + if self.set_entry_point_if_not_exists(txn, &query)? { + // We won the race, we're the first vector + query.set_distance(0.0); + return Ok(query); + } else { + // Another transaction set the entry point first, re-read it + self.get_entry_point(txn, label, arena)? + } } }; @@ -620,20 +672,71 @@ impl HNSW for VectorCore { self.select_neighbors::(txn, label, &query, nearest, level, true, None, arena)?; self.set_neighbours(txn, query.id, &neighbors, level)?; + // Update neighbors' connection lists to include the new vector. + // Use defensive error handling - if a neighbor was deleted/modified by another + // transaction, skip it rather than failing the entire insert. for e in neighbors { let id = e.id; - let e_conns = BinaryHeap::from( - arena, - self.get_neighbors::(txn, label, id, level, None, arena)?, - ); - let e_new_conn = self - .select_neighbors::(txn, label, &query, e_conns, level, true, None, arena)?; - self.set_neighbours(txn, id, &e_new_conn, level)?; + + // Try to get the neighbor's current connections - if this fails (e.g., neighbor + // was deleted by another transaction), skip updating this neighbor. + let e_conns = match self.get_neighbors::(txn, label, id, level, None, arena) { + Ok(conns) => BinaryHeap::from(arena, conns), + Err(VectorError::VectorNotFound(_)) | Err(VectorError::VectorDeleted) => { + continue; + } + Err(VectorError::EntryPointNotFound) => { + continue; + } + Err(e) => return Err(e), + }; + + let e_new_conn = match self.select_neighbors::( + txn, label, &query, e_conns, level, true, None, arena, + ) { + Ok(conns) => conns, + Err(VectorError::VectorNotFound(_)) | Err(VectorError::VectorDeleted) => { + continue; + } + Err(e) => return Err(e), + }; + + // Update the neighbor's connections - if this fails due to concurrent modification, + // it's not critical (the graph will be slightly suboptimal but still functional) + if let Err(_e) = self.set_neighbours(txn, id, &e_new_conn, level) { + debug_println!("Warning: failed to update neighbor {}: {:?}", id, _e); + } } } + // Update entry point if our level is higher than what we observed at the start. + // + // Note: There is an inherent race condition here that cannot be fully resolved + // without atomic compare-and-swap semantics (which LMDB doesn't support for + // arbitrary values). Between our check and commit, another transaction may have + // set a higher-level entry point. In this rare case, our lower-level entry point + // will overwrite it. This is acceptable because: + // 1. High-level vectors are exponentially rare (~1/m probability per level) + // 2. The impact is only slightly slower search (more levels to traverse) + // 3. Search correctness is not affected + // 4. The next high-level insert will fix it if new_level > l { - self.set_entry_point(txn, &query)?; + match self.get_entry_point(txn, label, arena) { + Ok(current_ep) => { + if new_level > current_ep.level { + self.set_entry_point(txn, &query)?; + } + } + Err(VectorError::EntryPointNotFound) => { + // Entry point was deleted (unusual), set ours + self.set_entry_point(txn, &query)?; + } + Err(_) => { + // I/O or other error reading entry point - don't risk overwriting + // a valid higher-level entry point. Skip this update; the next + // high-level insert will retry. + } + } } debug_println!("vector inserted with id {}", query.id);