-
-
Notifications
You must be signed in to change notification settings - Fork 246
fix(vector_core): prevent race conditions in concurrent HNSW writes #827
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
47c7e22
54303d1
b72add7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,7 +13,7 @@ use crate::{ | |
| utils::{id::uuid_str, properties::ImmutablePropertiesMap}, | ||
| }; | ||
| use heed3::{ | ||
| Database, Env, RoTxn, RwTxn, | ||
| Database, Env, Error as HeedError, MdbError, PutFlags, RoTxn, RwTxn, | ||
| byteorder::BE, | ||
| types::{Bytes, U128, Unit}, | ||
| }; | ||
|
|
@@ -144,21 +144,52 @@ impl VectorCore { | |
| Ok(()) | ||
| } | ||
|
|
||
| /// Atomically set the entry point only if it doesn't already exist. | ||
| /// Returns Ok(true) if we set it, Ok(false) if it already existed. | ||
| /// This prevents the race condition where multiple concurrent transactions | ||
| /// try to set the entry point simultaneously. | ||
| #[inline] | ||
| fn set_entry_point_if_not_exists( | ||
| &self, | ||
| txn: &mut RwTxn, | ||
| entry: &HVector, | ||
| ) -> Result<bool, VectorError> { | ||
| match self.vectors_db.put_with_flags( | ||
| txn, | ||
| PutFlags::NO_OVERWRITE, | ||
| ENTRY_POINT_KEY, | ||
| &entry.id.to_be_bytes(), | ||
| ) { | ||
| Ok(_) => Ok(true), | ||
| Err(HeedError::Mdb(MdbError::KeyExist)) => Ok(false), | ||
| Err(e) => Err(VectorError::from(e)), | ||
| } | ||
| } | ||
|
|
||
| /// Put a vector into the database. This stores both the vector data (embeddings) | ||
| /// and the vector properties (metadata) within the same transaction. | ||
| /// | ||
| /// Note: We serialize all data BEFORE doing any writes to ensure consistency. | ||
| /// If serialization fails, no partial writes occur. | ||
| #[inline(always)] | ||
| pub fn put_vector<'arena>( | ||
| &self, | ||
| txn: &mut RwTxn, | ||
| vector: &HVector<'arena>, | ||
| ) -> Result<(), VectorError> { | ||
| // Serialize all data upfront BEFORE any writes. | ||
| // This ensures we don't do partial writes if serialization fails. | ||
| let vector_key = Self::vector_key(vector.id, vector.level); | ||
| let vector_data_bytes = vector.vector_data_to_bytes()?; | ||
| let properties_bytes = bincode::serialize(&vector)?; | ||
|
|
||
| // Now do both writes - if either fails, LMDB will roll back the transaction | ||
| self.vectors_db | ||
| .put( | ||
| txn, | ||
| &Self::vector_key(vector.id, vector.level), | ||
| vector.vector_data_to_bytes()?, | ||
| ) | ||
| .put(txn, &vector_key, &vector_data_bytes) | ||
| .map_err(VectorError::from)?; | ||
| self.vector_properties_db | ||
| .put(txn, &vector.id, bincode::serialize(&vector)?.as_ref())?; | ||
| .put(txn, &vector.id, &properties_bytes) | ||
| .map_err(VectorError::from)?; | ||
| Ok(()) | ||
| } | ||
|
|
||
|
|
@@ -406,20 +437,34 @@ impl VectorCore { | |
| Ok(vector) | ||
| } | ||
|
|
||
| /// Get a full vector (data + properties) from the database. | ||
| /// Handles potential inconsistency where one database has data but the other doesn't. | ||
| #[inline(always)] | ||
| pub fn get_full_vector<'arena>( | ||
| &self, | ||
| txn: &RoTxn, | ||
| id: u128, | ||
| arena: &'arena bumpalo::Bump, | ||
| ) -> Result<HVector<'arena>, VectorError> { | ||
| let vector_data_bytes = self | ||
| .vectors_db | ||
| .get(txn, &Self::vector_key(id, 0))? | ||
| .ok_or(VectorError::VectorNotFound(uuid_str(id, arena).to_string()))?; | ||
| let vector_data_bytes = match self.vectors_db.get(txn, &Self::vector_key(id, 0))? { | ||
| Some(bytes) => bytes, | ||
| None => { | ||
| return Err(VectorError::VectorNotFound(uuid_str(id, arena).to_string())); | ||
| } | ||
| }; | ||
|
|
||
| let properties_bytes = self.vector_properties_db.get(txn, &id)?; | ||
|
|
||
| // Defensive check: if we have vector data but no properties, the vector | ||
| // may be in an inconsistent state. Return not found rather than panicking. | ||
| if properties_bytes.is_none() && !vector_data_bytes.is_empty() { | ||
| debug_println!( | ||
| "Warning: vector {} has data but no properties (inconsistent state)", | ||
| uuid_str(id, arena) | ||
| ); | ||
| return Err(VectorError::VectorNotFound(uuid_str(id, arena).to_string())); | ||
| } | ||
|
|
||
| let vector = HVector::from_bincode_bytes(arena, properties_bytes, vector_data_bytes, id)?; | ||
| if vector.deleted { | ||
| return Err(VectorError::VectorDeleted); | ||
|
|
@@ -580,14 +625,21 @@ impl HNSW for VectorCore { | |
|
|
||
| query.level = new_level; | ||
|
|
||
| // Use atomic set-if-not-exists to prevent race condition where multiple | ||
| // concurrent transactions try to set the entry point simultaneously. | ||
| // This fixes "double free or corruption" crashes under high concurrency. | ||
| let entry_point = match self.get_entry_point(txn, label, arena) { | ||
| Ok(ep) => ep, | ||
| Err(_) => { | ||
| // TODO: use proper error handling | ||
| self.set_entry_point(txn, &query)?; | ||
| query.set_distance(0.0); | ||
|
|
||
| return Ok(query); | ||
| // Atomically try to set entry point - only one transaction will succeed | ||
| if self.set_entry_point_if_not_exists(txn, &query)? { | ||
| // We won the race, we're the first vector | ||
| query.set_distance(0.0); | ||
| return Ok(query); | ||
| } else { | ||
| // Another transaction set the entry point first, re-read it | ||
| self.get_entry_point(txn, label, arena)? | ||
| } | ||
| } | ||
| }; | ||
|
|
||
|
|
@@ -620,20 +672,71 @@ impl HNSW for VectorCore { | |
| self.select_neighbors::<F>(txn, label, &query, nearest, level, true, None, arena)?; | ||
| self.set_neighbours(txn, query.id, &neighbors, level)?; | ||
|
|
||
| // Update neighbors' connection lists to include the new vector. | ||
| // Use defensive error handling - if a neighbor was deleted/modified by another | ||
| // transaction, skip it rather than failing the entire insert. | ||
| for e in neighbors { | ||
| let id = e.id; | ||
| let e_conns = BinaryHeap::from( | ||
| arena, | ||
| self.get_neighbors::<F>(txn, label, id, level, None, arena)?, | ||
| ); | ||
| let e_new_conn = self | ||
| .select_neighbors::<F>(txn, label, &query, e_conns, level, true, None, arena)?; | ||
| self.set_neighbours(txn, id, &e_new_conn, level)?; | ||
|
|
||
| // Try to get the neighbor's current connections - if this fails (e.g., neighbor | ||
| // was deleted by another transaction), skip updating this neighbor. | ||
| let e_conns = match self.get_neighbors::<F>(txn, label, id, level, None, arena) { | ||
| Ok(conns) => BinaryHeap::from(arena, conns), | ||
| Err(VectorError::VectorNotFound(_)) | Err(VectorError::VectorDeleted) => { | ||
| continue; | ||
| } | ||
| Err(VectorError::EntryPointNotFound) => { | ||
| continue; | ||
| } | ||
| Err(e) => return Err(e), | ||
| }; | ||
|
|
||
| let e_new_conn = match self.select_neighbors::<F>( | ||
| txn, label, &query, e_conns, level, true, None, arena, | ||
| ) { | ||
| Ok(conns) => conns, | ||
| Err(VectorError::VectorNotFound(_)) | Err(VectorError::VectorDeleted) => { | ||
| continue; | ||
| } | ||
| Err(e) => return Err(e), | ||
| }; | ||
|
|
||
| // Update the neighbor's connections - if this fails due to concurrent modification, | ||
| // it's not critical (the graph will be slightly suboptimal but still functional) | ||
| if let Err(_e) = self.set_neighbours(txn, id, &e_new_conn, level) { | ||
| debug_println!("Warning: failed to update neighbor {}: {:?}", id, _e); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Update entry point if our level is higher than what we observed at the start. | ||
| // | ||
| // Note: There is an inherent race condition here that cannot be fully resolved | ||
| // without atomic compare-and-swap semantics (which LMDB doesn't support for | ||
| // arbitrary values). Between our check and commit, another transaction may have | ||
| // set a higher-level entry point. In this rare case, our lower-level entry point | ||
| // will overwrite it. This is acceptable because: | ||
| // 1. High-level vectors are exponentially rare (~1/m probability per level) | ||
| // 2. The impact is only slightly slower search (more levels to traverse) | ||
| // 3. Search correctness is not affected | ||
| // 4. The next high-level insert will fix it | ||
| if new_level > l { | ||
| self.set_entry_point(txn, &query)?; | ||
| match self.get_entry_point(txn, label, arena) { | ||
| Ok(current_ep) => { | ||
| if new_level > current_ep.level { | ||
| self.set_entry_point(txn, &query)?; | ||
| } | ||
| } | ||
| Err(VectorError::EntryPointNotFound) => { | ||
| // Entry point was deleted (unusual), set ours | ||
| self.set_entry_point(txn, &query)?; | ||
| } | ||
| Err(_) => { | ||
| // I/O or other error reading entry point - don't risk overwriting | ||
| // a valid higher-level entry point. Skip this update; the next | ||
| // high-level insert will retry. | ||
| } | ||
| } | ||
| } | ||
|
Comment on lines
723
to
740
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The entry point update logic has a race condition. Between reading |
||
|
|
||
| debug_println!("vector inserted with id {}", query.id); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unconditionally setting entry point on any error (line 725) can overwrite a valid entry point if there's a temporary read error. Only set on
EntryPointNotFound(line 722), not on other errors like database I/O failures.