Skip to content

Commit cab20b7

Browse files
committed
added local
1 parent 576a274 commit cab20b7

10 files changed

Lines changed: 396 additions & 386 deletions

File tree

README.md

Lines changed: 117 additions & 377 deletions
Large diffs are not rendered by default.

node/src/api.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,3 +166,14 @@ pub struct EventProofResponse {
166166
pub event_count: u64,
167167
pub committed_height: u64,
168168
}
169+
170+
// Phase 34: Batch Ingestion
171+
#[derive(Deserialize, Serialize, Debug)]
172+
pub struct BatchInsertRequest {
173+
pub batch: Vec<Vec<f32>>,
174+
}
175+
176+
#[derive(Serialize, Deserialize, Debug)]
177+
pub struct BatchInsertResponse {
178+
pub ids: Vec<u32>,
179+
}

node/src/engine.rs

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,105 @@ impl<const MAX_RECORDS: usize, const D: usize, const MAX_NODES: usize, const MAX
268268
}
269269
}
270270

271+
/// Insert a batch of records in a single atomic transaction.
272+
/// Returns the list of assigned IDs.
273+
pub fn insert_batch(&mut self, batch_values: &[Vec<f32>]) -> Result<Vec<u32>, EngineError> {
274+
if batch_values.is_empty() {
275+
return Ok(Vec::new());
276+
}
277+
278+
// Validate all inputs first
279+
for values in batch_values {
280+
if values.len() != D {
281+
return Err(EngineError::InvalidInput(format!("Expected {} dimensions, got {}", D, values.len())));
282+
}
283+
for &v in values {
284+
if v > MAX_SAFE_F || v < MIN_SAFE_F {
285+
return Err(EngineError::InvalidInput(format!(
286+
"Embedding value {} out of allowed range [{:.1}, {:.1}]",
287+
v, MIN_SAFE_F, MAX_SAFE_F
288+
)));
289+
}
290+
}
291+
}
292+
293+
// Prepare events
294+
// Phase 23: Event-sourced path ONLY (Batching not supported in legacy WAL)
295+
if let Some(ref mut committer) = self.event_committer {
296+
let mut events = Vec::with_capacity(batch_values.len());
297+
let mut assigned_ids = Vec::with_capacity(batch_values.len());
298+
299+
// Track used IDs to avoid collisions within the batch
300+
let mut temp_used_ids = std::collections::HashSet::new();
301+
302+
// ID Generation Logic (Provisioning)
303+
let mut next_candidate = 0;
304+
305+
for values in batch_values {
306+
// Find next free slot
307+
let mut found_id = None;
308+
for i in next_candidate..MAX_RECORDS {
309+
let rid = RecordId(i as u32);
310+
// Check if occupied in Kernel OR already assigned in this batch
311+
if self.state.get_record(rid).is_none() && !temp_used_ids.contains(&i) {
312+
found_id = Some(rid);
313+
next_candidate = i + 1; // Optimization: start next search here
314+
break;
315+
}
316+
}
317+
318+
let id = found_id.ok_or(valori_kernel::error::KernelError::CapacityExceeded)?;
319+
temp_used_ids.insert(id.0 as usize);
320+
assigned_ids.push(id.0);
321+
322+
// Create FxpVector
323+
let mut vector = FxpVector::<D>::new_zeros();
324+
for (i, v) in values.iter().enumerate() {
325+
let fixed = (v * SCALE).round().clamp(i32::MIN as f32, i32::MAX as f32) as i32;
326+
vector.data[i] = FxpScalar(fixed);
327+
}
328+
329+
events.push(KernelEvent::InsertRecord { id, vector });
330+
}
331+
332+
let start = std::time::Instant::now();
333+
334+
// Atomic Batch Commit
335+
match committer.commit_batch(events.clone()) {
336+
Ok(CommitResult::Committed) => {
337+
tracing::info!("Batch committed: {} records", events.len());
338+
metrics::counter!("valori_events_committed_total", events.len() as u64);
339+
metrics::histogram!("valori_batch_commit_duration_seconds", start.elapsed().as_secs_f64());
340+
341+
// Sync State & Index
342+
for event in &events {
343+
self.state.apply_event(event).map_err(EngineError::Kernel)?;
344+
345+
if let KernelEvent::InsertRecord { id, vector } = event {
346+
let mut consistent_values = Vec::with_capacity(D);
347+
for i in 0..D {
348+
let fxp = vector.data[i];
349+
let f = fxp.0 as f32 / SCALE;
350+
consistent_values.push(f);
351+
}
352+
self.index.insert(id.0, &consistent_values);
353+
}
354+
}
355+
356+
Ok(assigned_ids)
357+
},
358+
Ok(CommitResult::RolledBack) => {
359+
Err(EngineError::InvalidInput("Batch validation failed (Rolled Back)".to_string()))
360+
},
361+
Err(e) => {
362+
Err(EngineError::InvalidInput(format!("Batch commit failed: {:?}", e)))
363+
}
364+
}
365+
} else {
366+
Err(EngineError::InvalidInput("Batch insert requires Event Log (legacy WAL not supported)".to_string()))
367+
}
368+
}
369+
271370
/// Apply an event that has already been committed (e.g. from replication stream or local commit).
272371
/// Updates BOTH kernel state AND auxiliary structures (Index, Bitmap).
273372
pub fn apply_committed_event(&mut self, event: &KernelEvent<D>) -> Result<(), EngineError> {

node/src/events/event_commit.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -233,11 +233,12 @@ impl<const M: usize, const D: usize, const N: usize, const E: usize> EventCommit
233233
return Ok(CommitResult::Committed);
234234
}
235235

236-
// Step 1: Persist ALL events to disk first
237-
for event in &events {
238-
let entry = crate::events::event_log::LogEntry::Event(event.clone());
239-
self.event_log.append(&entry)?;
240-
}
236+
// Step 1: Persist ALL events to disk first (Single Fsync)
237+
let log_entries: Vec<_> = events.iter()
238+
.map(|e| crate::events::event_log::LogEntry::Event(e.clone()))
239+
.collect();
240+
241+
self.event_log.append_batch(&log_entries)?;
241242

242243
// Step 2: Add all to buffer
243244
for event in &events {

node/src/events/event_log.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,40 @@ impl<const D: usize> EventLogWriter<D> {
206206
Ok(())
207207
}
208208

209+
/// Append multiple entries to the log with a SINGLE fsync
210+
///
211+
/// This provides atomicity for batches: either all specific bytes are physically on disk
212+
/// (after fsync return) or we crash before fsync returns (and they might not be).
213+
///
214+
/// Note: If a partial write happens (less than full batch), the log recovery
215+
/// logic must handle truncation of incomplete tail writes.
216+
pub fn append_batch(&mut self, entries: &[LogEntry<D>]) -> Result<()> {
217+
if entries.is_empty() {
218+
return Ok(());
219+
}
220+
221+
for entry in entries {
222+
let bytes = bincode::serde::encode_to_vec(entry, bincode::config::standard())
223+
.map_err(|e| EventLogError::Serialization(e.to_string()))?;
224+
self.file.write_all(&bytes)?;
225+
}
226+
227+
// Flush buffer once
228+
self.file.flush()?;
229+
230+
// Force fsync once
231+
self.file.get_ref().sync_all()?;
232+
233+
// Update counts
234+
for entry in entries {
235+
if let LogEntry::Event(_) = entry {
236+
self.event_count += 1;
237+
}
238+
}
239+
240+
Ok(())
241+
}
242+
209243
/// Get the number of events written
210244
pub fn event_count(&self) -> u64 {
211245
self.event_count

node/src/server.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ pub fn build_router<const M: usize, const D: usize, const N: usize, const E: usi
8181
) -> Router {
8282
let mut app = Router::new()
8383
.route("/records", post(insert_record))
84+
.route("/v1/vectors/batch_insert", post(batch_insert)) // Phase 34
8485
.route("/search", post(search))
8586
.route("/graph/node", post(create_node))
8687
.route("/graph/edge", post(create_edge))
@@ -184,6 +185,15 @@ async fn insert_record<const M: usize, const D: usize, const N: usize, const E:
184185
Ok(Json(InsertRecordResponse { id }))
185186
}
186187

188+
async fn batch_insert<const M: usize, const D: usize, const N: usize, const E: usize>(
189+
State(state): State<SharedEngine<M, D, N, E>>,
190+
Json(payload): Json<BatchInsertRequest>,
191+
) -> Result<Json<BatchInsertResponse>, EngineError> {
192+
let mut engine = state.lock().await;
193+
let ids = engine.insert_batch(&payload.batch)?;
194+
Ok(Json(BatchInsertResponse { ids }))
195+
}
196+
187197
async fn search<const M: usize, const D: usize, const N: usize, const E: usize>(
188198
State(state): State<SharedEngine<M, D, N, E>>,
189199
Json(payload): Json<SearchRequest>,

node/tests/api_batch_ingest.rs

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
use valori_node::config::NodeConfig;
2+
use valori_node::server::build_router;
3+
use valori_node::engine::Engine;
4+
use valori_node::api::{BatchInsertRequest, BatchInsertResponse, InsertRecordRequest};
5+
use axum::{
6+
body::Body,
7+
http::{Request, StatusCode},
8+
};
9+
use tower::ServiceExt; // for oneshot
10+
use std::sync::Arc;
11+
use tokio::sync::Mutex;
12+
use tempfile::tempdir;
13+
14+
// Define concrete types matching server.rs
15+
const M: usize = 100;
16+
const D: usize = 16;
17+
const N: usize = 100;
18+
const E: usize = 200;
19+
20+
#[tokio::test]
21+
async fn test_batch_ingest_success() {
22+
let dir = tempdir().unwrap();
23+
let db_path = dir.path().join("valori.wal");
24+
let event_log_path = dir.path().join("events.log");
25+
26+
let mut config = NodeConfig::default();
27+
config.max_records = M;
28+
config.dim = D;
29+
config.max_nodes = N;
30+
config.max_edges = E;
31+
config.wal_path = Some(db_path.clone());
32+
config.event_log_path = Some(event_log_path.clone()); // Enable Event Log for Batching
33+
34+
let engine = Engine::<M, D, N, E>::new(&config);
35+
let shared_state = Arc::new(Mutex::new(engine));
36+
let app = build_router(shared_state, None);
37+
38+
// Prepare Batch
39+
let batch = vec![
40+
vec![0.1; D],
41+
vec![0.2; D],
42+
vec![0.3; D],
43+
];
44+
45+
let req = Request::builder()
46+
.method("POST")
47+
.uri("/v1/vectors/batch_insert")
48+
.header("content-type", "application/json")
49+
.body(Body::from(serde_json::to_vec(&BatchInsertRequest { batch }).unwrap()))
50+
.unwrap();
51+
52+
let response = app.oneshot(req).await.unwrap();
53+
assert_eq!(response.status(), StatusCode::OK);
54+
55+
let body_bytes = axum::body::to_bytes(response.into_body(), 1024).await.unwrap();
56+
let resp: BatchInsertResponse = serde_json::from_slice(&body_bytes).unwrap();
57+
58+
assert_eq!(resp.ids.len(), 3);
59+
assert_eq!(resp.ids, vec![0, 1, 2]); // First batch should get 0, 1, 2
60+
}
61+
62+
#[tokio::test]
63+
async fn test_batch_ingest_atomicity_failure() {
64+
let dir = tempdir().unwrap();
65+
let db_path = dir.path().join("valori.wal");
66+
let event_log_path = dir.path().join("events.log");
67+
68+
let mut config = NodeConfig::default();
69+
config.max_records = M;
70+
config.dim = D;
71+
config.max_nodes = N;
72+
config.max_edges = E;
73+
config.wal_path = Some(db_path.clone());
74+
config.event_log_path = Some(event_log_path.clone());
75+
76+
let engine = Engine::<M, D, N, E>::new(&config);
77+
let shared_state = Arc::new(Mutex::new(engine));
78+
let app = build_router(shared_state.clone(), None);
79+
80+
// Invalid payload (one vector has wrong dim)
81+
let batch = vec![
82+
vec![0.1; D],
83+
vec![0.2; D + 1], // INVALID DIM
84+
vec![0.3; D],
85+
];
86+
87+
let req = Request::builder()
88+
.method("POST")
89+
.uri("/v1/vectors/batch_insert")
90+
.header("content-type", "application/json")
91+
.body(Body::from(serde_json::to_vec(&BatchInsertRequest { batch }).unwrap()))
92+
.unwrap();
93+
94+
let response = app.oneshot(req).await.unwrap();
95+
// Should fail validation before commit
96+
// Since insert_batch validates strictly before commit, this should return 500 or 400 depending on error mapping
97+
// EngineError::InvalidInput maps to INTERNAL_SERVER_ERROR currently? or BAD_REQUEST?
98+
// Let's check api.rs/errors.rs mapping. Usually InvalidInput -> 400?
99+
// Actually, Axum doesn't auto-map EngineError.
100+
// Wait, EngineError needs IntoResponse.
101+
// Assuming standard error handling returns error code.
102+
assert!(response.status().is_client_error() || response.status().is_server_error());
103+
104+
// Verify NOTHING was inserted
105+
let engine = shared_state.lock().await;
106+
// Check ID 0 is empty
107+
assert!(engine.search_l2(&vec![0.1; D], 1).unwrap().is_empty());
108+
}

python/valori/__init__.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,17 @@
44
from .remote import RemoteClient
55

66
class Valori:
7-
def __new__(cls, remote: Optional[str] = None):
7+
def __new__(cls, remote: Optional[str] = None, path: str = "./valori_db"):
88
"""
99
Factory yielding either a LocalClient (FFI) or RemoteClient (HTTP).
1010
1111
Args:
1212
remote: If None (default), uses LocalClient (ffi).
1313
If a URL string, uses RemoteClient.
14+
path: Path to database directory (only used for LocalClient).
1415
"""
1516
if remote is None:
16-
return LocalClient()
17+
return LocalClient(path=path)
1718
else:
1819
return RemoteClient(base_url=remote)
1920

python/valori/local.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@
1515
_ffi = None
1616

1717
class LocalClient:
18-
def __init__(self):
18+
def __init__(self, path: str = "./valori_db"):
1919
if _ffi is None:
2020
raise ImportError("Could not load 'valori_ffi' module. Ensure it is compiled and in PYTHONPATH.")
21-
self.kernel = _ffi.PyKernel()
21+
self.kernel = _ffi.ValoriEngine(path)
2222

2323
def insert(self, vector: List[float]) -> int:
2424
return self.kernel.insert(vector)

python/valori/remote.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@ def insert(self, vector: List[float]) -> int:
1919
resp = self._post("/records", data)
2020
return resp["id"]
2121

22+
def insert_batch(self, batch: List[List[float]]) -> List[int]:
23+
"""Insert a batch of vectors. Returns list of new Record IDs."""
24+
data = {"batch": batch}
25+
resp = self._post("/v1/vectors/batch_insert", data)
26+
return resp["ids"]
27+
2228
def search(self, query: List[float], k: int) -> List[Dict[str, Any]]:
2329
"""Search for nearest vectors. Returns list of hits [{'id': int, 'score': int}]."""
2430
data = {"query": query, "k": k}

0 commit comments

Comments
 (0)