feat(gateway): add /bulk_import endpoint for batch node and edge creation#897
feat(gateway): add /bulk_import endpoint for batch node and edge creation#897financialvice wants to merge 1 commit into
Conversation
…tion Adds a built-in handler that creates nodes and edges in a single atomic LMDB write transaction, bypassing the query engine for high-throughput graph construction. Supports temp_id references for wiring edges within the same batch and existing UUID references for connecting to pre-existing nodes. Handles secondary indexes, BM25 full-text indexing, and schema version info.
| fn resolve_node_ref( | ||
| reference: &str, | ||
| temp_id_map: &HashMap<String, u128>, | ||
| ) -> Result<u128, GraphError> { | ||
| // Try UUID parse first | ||
| if let Ok(uuid) = uuid::Uuid::parse_str(reference) { | ||
| return Ok(uuid.as_u128()); | ||
| } | ||
| // Then try temp_id lookup | ||
| temp_id_map | ||
| .get(reference) | ||
| .copied() | ||
| .ok_or_else(|| GraphError::New(format!("Unknown node reference: '{reference}'"))) | ||
| } |
There was a problem hiding this comment.
UUID references accepted without existence check
resolve_node_ref accepts any syntactically valid UUID without verifying it corresponds to an actual node in the database. If a caller passes a plausible-looking but nonexistent UUID in from or to, the bulk import succeeds and silently writes dangling edge entries into out_edges_db and in_edges_db that point to a ghost node. Any subsequent traversal along such an edge will surface a NodeNotFound error (or silent data corruption depending on how the traversal is consumed).
This is particularly risky for a high-throughput import path where stale or mis-copied IDs are common. A simple db.nodes_db.get(&txn, &node_id)? presence check before accepting the UUID would prevent the inconsistency:
fn resolve_node_ref(
reference: &str,
temp_id_map: &HashMap<String, u128>,
db: &HelixGraphStorage,
txn: &heed3::RwTxn<'_>,
) -> Result<u128, GraphError> {
if let Ok(uuid) = uuid::Uuid::parse_str(reference) {
let id = uuid.as_u128();
// Verify the node actually exists before accepting the reference
if db.nodes_db.get(txn, &id)?.is_none() {
return Err(GraphError::New(format!(
"Node with UUID '{reference}' does not exist"
)));
}
return Ok(id);
}
temp_id_map
.get(reference)
.copied()
.ok_or_else(|| GraphError::New(format!("Unknown node reference: '{reference}'")))
}| fn sonic_value_to_value(v: &sonic_rs::Value) -> Value { | ||
| if let Some(s) = v.as_str() { | ||
| Value::String(s.to_string()) | ||
| } else if let Some(b) = v.as_bool() { | ||
| Value::Boolean(b) | ||
| } else if let Some(n) = v.as_i64() { | ||
| Value::I64(n) | ||
| } else if let Some(n) = v.as_u64() { | ||
| Value::U64(n) | ||
| } else if let Some(n) = v.as_f64() { | ||
| Value::F64(n) | ||
| } else if let Some(arr) = v.as_array() { | ||
| Value::Array(arr.iter().map(sonic_value_to_value).collect()) | ||
| } else if let Some(obj) = v.as_object() { | ||
| let mut map = HashMap::with_capacity(obj.len()); | ||
| for (k, val) in obj.iter() { | ||
| map.insert(k.to_string(), sonic_value_to_value(val)); | ||
| } | ||
| Value::Object(map) | ||
| } else { | ||
| Value::Empty | ||
| } |
There was a problem hiding this comment.
JSON
null values silently become Value::Empty
sonic_value_to_value falls through all type checks for null literals and returns Value::Empty, which is indistinguishable from an absent property at the call site. A caller sending { "name": null } expects the name key to be stored (even if empty/null), but it will behave the same as omitting the key entirely. This is silent data loss that could cause hard-to-diagnose issues for users explicitly nulling out a field.
Consider either returning an explicit error for null values (if null properties aren't supported) or adding a Value::Null variant, and documenting the behavior in the doc comment.
Description
Adds a
POST /bulk_importbuilt-in handler that creates nodes and edges in a single atomic LMDB write transaction, bypassing the query engine for high-throughput graph construction.nodesandedgesarrays in one requesttemp_idreferences for wiring edges to nodes created in the same batchfrom/toRelated Issues
Closes #896
Checklist when merging to main
rustfmthelix-cli/Cargo.tomlandhelixdb/Cargo.tomlAdditional Notes
Two files changed: new
bulk_import.rs(691 lines including 345 lines of tests) + one line added tomod.rs. No existing code modified.Greptile Summary
This PR adds a
POST /bulk_importbuilt-in handler that creates nodes and edges in a single atomic LMDB write transaction, bypassing the query engine for high-throughput graph construction. The implementation correctly handles secondary indexes (unique and non-unique), BM25 full-text indexing,temp_idwiring between batch-created nodes, and includes 10 tests covering happy paths, error paths, and round-trip persistence.Issues found:
resolve_node_refaccepts any syntactically valid UUID without verifying the corresponding node exists innodes_db. A nonexistent UUID will silently produce dangling edge entries inout_edges_db/in_edges_db, corrupting graph consistency. Anodes_db.get()existence check before accepting a UUID reference would prevent this.nullproperties silently dropped:sonic_value_to_valuemapsnulltoValue::Empty, which is indistinguishable from an absent property. Users explicitly nulling a field will lose that data without any error or warning.bumpalo::Bump::with_capacity(req.nodes.len() * 80)does not account for edge labels and property keys also allocated from the same arena.insert_bm25_node_docwrapper: The helper function is a single-line pass-through tobm25.insert_doc_for_node()and can be inlined at the call site.Important Files Changed
bulk_importmodule — trivially correct.Sequence Diagram
sequenceDiagram participant Client participant Gateway as Helix Gateway<br/>/bulk_import participant LMDB as LMDB (heed3) participant SecIdx as Secondary Indices participant BM25 as BM25 Index Client->>Gateway: POST /bulk_import {nodes[], edges[]} Gateway->>Gateway: Deserialize BulkImportRequest (sonic_rs) Gateway->>LMDB: Open exclusive write_txn loop Phase 1: For each node Gateway->>Gateway: Check duplicate temp_id Gateway->>Gateway: Convert JSON props → ImmutablePropertiesMap (arena) Gateway->>Gateway: Generate v6_uuid() + get_latest version Gateway->>LMDB: nodes_db.put(node_id, bincode(node)) Gateway->>SecIdx: put / put_with_flags(NO_OVERWRITE) per index Gateway->>BM25: insert_doc_for_node (if bm25 configured) Gateway->>Gateway: temp_id_map[temp_id] = node_id end loop Phase 2: For each edge Gateway->>Gateway: resolve_node_ref(from) — UUID parse OR temp_id lookup Gateway->>Gateway: resolve_node_ref(to) — UUID parse OR temp_id lookup Gateway->>Gateway: Generate v6_uuid() + get_latest version Gateway->>LMDB: edges_db.put(edge_id, bincode(edge)) Gateway->>LMDB: out_edges_db.put(out_key, pack_edge_data) Gateway->>LMDB: in_edges_db.put(in_key, pack_edge_data) end Gateway->>LMDB: txn.commit() Gateway->>Client: {node_ids, nodes_created, edges_created}Reviews (1): Last reviewed commit: "feat(gateway): add /bulk_import endpoint..." | Re-trigger Greptile