-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathdatabase.rs
More file actions
404 lines (340 loc) · 12.9 KB
/
Copy pathdatabase.rs
File metadata and controls
404 lines (340 loc) · 12.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
//! Main Database API - unified interface for all operations
//!
//! Provides a single entry point for all LadybugDB operations:
//! - SQL queries (via DataFusion)
//! - Cypher queries (via transpilation)
//! - Vector search (via LanceDB ANN)
//! - Hamming/resonance search (via SIMD engine)
//! - Graph traversal and butterfly detection
use crate::core::{Fingerprint, HammingEngine};
use crate::cognitive::Thought;
use crate::nars::TruthValue;
use crate::graph::{Edge, Traversal};
use crate::query::{Query, QueryResult, cypher_to_sql, SqlEngine, QueryBuilder};
use crate::storage::{LanceStore, NodeRecord, EdgeRecord};
use crate::{Result, Error};
use arrow::record_batch::RecordBatch;
use std::path::Path;
use std::sync::Arc;
use parking_lot::RwLock;
/// Main database handle - unified access to all operations
pub struct Database {
/// Path to database
path: String,
/// Lance storage backend
lance: Arc<tokio::sync::RwLock<LanceStore>>,
/// SQL execution engine
sql_engine: Arc<tokio::sync::RwLock<SqlEngine>>,
/// Hamming search engine (pre-indexed, in-memory)
hamming: Arc<RwLock<HammingEngine>>,
/// Current version (for copy-on-write)
version: u64,
}
impl Database {
/// Open or create a database (async)
pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
let path_str = path.as_ref().to_string_lossy().to_string();
// Create directory if needed
std::fs::create_dir_all(&path_str)?;
// Open Lance store
let lance = LanceStore::open(&path_str).await?;
// Create SQL engine with Lance tables
let sql_engine = SqlEngine::with_database(&path_str).await?;
Ok(Self {
path: path_str,
lance: Arc::new(tokio::sync::RwLock::new(lance)),
sql_engine: Arc::new(tokio::sync::RwLock::new(sql_engine)),
hamming: Arc::new(RwLock::new(HammingEngine::new())),
version: 0,
})
}
/// Open synchronously (blocks on runtime)
pub fn open_sync<P: AsRef<Path>>(path: P) -> Result<Self> {
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(Self::open(path))
}
/// Connect to in-memory database
pub fn memory() -> Self {
Self {
path: ":memory:".to_string(),
lance: Arc::new(tokio::sync::RwLock::new(LanceStore::memory())),
sql_engine: Arc::new(tokio::sync::RwLock::new(SqlEngine::default())),
hamming: Arc::new(RwLock::new(HammingEngine::new())),
version: 0,
}
}
// =========================================================================
// SQL OPERATIONS
// =========================================================================
/// Execute SQL query
pub async fn sql(&self, query: &str) -> Result<Vec<RecordBatch>> {
let engine = self.sql_engine.read().await;
engine.execute(query).await
}
/// Execute SQL query with parameters
pub async fn sql_params(
&self,
query: &str,
params: &[(&str, datafusion::scalar::ScalarValue)],
) -> Result<Vec<RecordBatch>> {
let engine = self.sql_engine.read().await;
engine.execute_with_params(query, params).await
}
/// Build and execute a query
pub async fn query(&self) -> QueryBuilder {
QueryBuilder::from("nodes")
}
// =========================================================================
// CYPHER OPERATIONS
// =========================================================================
/// Execute Cypher query (transpiled to SQL)
pub async fn cypher(&self, query: &str) -> Result<Vec<RecordBatch>> {
// Transpile Cypher to SQL
let sql = cypher_to_sql(query)?;
// Execute via SQL engine
self.sql(&sql).await
}
// =========================================================================
// VECTOR OPERATIONS
// =========================================================================
/// Vector similarity search (ANN)
pub async fn vector_search(
&self,
embedding: &[f32],
k: usize,
) -> Result<Vec<(NodeRecord, f32)>> {
let mut lance = self.lance.write().await;
lance.vector_search(embedding, k, None).await
}
/// Vector search with filter
pub async fn vector_search_filtered(
&self,
embedding: &[f32],
k: usize,
filter: &str,
) -> Result<Vec<(NodeRecord, f32)>> {
let mut lance = self.lance.write().await;
lance.vector_search(embedding, k, Some(filter)).await
}
// =========================================================================
// HAMMING/RESONANCE OPERATIONS
// =========================================================================
/// Resonance search (Hamming similarity) - in-memory indexed
pub fn resonate(
&self,
fingerprint: &Fingerprint,
threshold: f32,
limit: usize,
) -> Vec<(usize, f32)> {
let engine = self.hamming.read();
engine.search_threshold(fingerprint, threshold, limit)
.into_iter()
.map(|(idx, _, sim)| (idx, sim))
.collect()
}
/// Resonance search over Lance storage
pub async fn resonate_lance(
&self,
fingerprint: &Fingerprint,
k: usize,
threshold: Option<f32>,
) -> Result<Vec<(NodeRecord, u32, f32)>> {
let mut lance = self.lance.write().await;
lance.hamming_search(fingerprint, k, threshold).await
}
/// Resonate by content (auto-generates fingerprint)
pub fn resonate_content(
&self,
content: &str,
threshold: f32,
limit: usize,
) -> Vec<(usize, f32)> {
let fp = Fingerprint::from_content(content);
self.resonate(&fp, threshold, limit)
}
/// Index fingerprints for resonance search (in-memory)
pub fn index_fingerprints(&self, fingerprints: Vec<Fingerprint>) {
let mut engine = self.hamming.write();
engine.index(fingerprints);
}
// =========================================================================
// GRAPH OPERATIONS
// =========================================================================
/// Start a graph traversal query
pub fn traverse(&self, start_id: &str) -> Traversal {
Traversal::from(start_id)
}
/// Detect butterfly effects (causal amplification chains)
pub async fn detect_butterflies(
&self,
source_id: &str,
threshold: f32,
max_depth: usize,
) -> Result<Vec<RecordBatch>> {
let cypher = format!(
"MATCH (source)-[:CAUSES|AMPLIFIES*1..{}]->(target) \
WHERE source.id = '{}' \
RETURN target, path, amplification",
max_depth, source_id
);
let mut sql = cypher_to_sql(&cypher)?;
sql.push_str(&format!("\n AND t.amplification > {}", threshold));
self.sql(&sql).await
}
/// Impact analysis for a potential change
pub async fn impact_analysis(&self, change_id: &str) -> Result<ImpactReport> {
// Get all affected nodes
let affected = self.cypher(&format!(
"MATCH (source)-[:CAUSES|AMPLIFIES|ENABLES*1..10]->(affected) \
WHERE source.id = '{}' \
RETURN affected",
change_id
)).await?;
// Get butterfly effects
let butterflies = self.detect_butterflies(change_id, 5.0, 10).await?;
let total_affected = affected.iter().map(|b| b.num_rows()).sum();
let butterfly_count = butterflies.iter().map(|b| b.num_rows()).sum();
Ok(ImpactReport {
total_affected,
butterfly_count,
affected_batches: affected,
butterfly_batches: butterflies,
})
}
// =========================================================================
// CRUD OPERATIONS
// =========================================================================
/// Add a node
pub async fn add_node(&self, node: NodeRecord) -> Result<()> {
let mut lance = self.lance.write().await;
lance.insert_node(&node).await
}
/// Add multiple nodes
pub async fn add_nodes(&self, nodes: &[NodeRecord]) -> Result<()> {
let mut lance = self.lance.write().await;
lance.insert_nodes(nodes).await
}
/// Add an edge
pub async fn add_edge(&self, edge: EdgeRecord) -> Result<()> {
let mut lance = self.lance.write().await;
lance.insert_edge(&edge).await
}
/// Get a node by ID
pub async fn get_node(&self, id: &str) -> Result<Option<NodeRecord>> {
let mut lance = self.lance.write().await;
lance.get_node(id).await
}
/// Get edges from a node
pub async fn get_edges_from(&self, from_id: &str) -> Result<Vec<EdgeRecord>> {
let mut lance = self.lance.write().await;
lance.get_edges_from(from_id).await
}
/// Add a thought (convenience method)
pub async fn add_thought(&self, thought: &Thought) -> Result<String> {
let node = NodeRecord::new(&thought.id, "Thought")
.with_qidx(thought.qidx)
.with_content(&thought.content);
// Add fingerprint if available
let node = if let Some(fp) = thought.fingerprint.as_ref() {
node.with_fingerprint(fp)
} else {
node
};
self.add_node(node).await?;
Ok(thought.id.clone())
}
/// Create a CAUSES edge
pub async fn causes(&self, from_id: &str, to_id: &str, amplification: f32) -> Result<()> {
let edge = EdgeRecord::new(from_id, to_id, "CAUSES")
.with_amplification(amplification);
self.add_edge(edge).await
}
/// Create an ENABLES edge
pub async fn enables(&self, from_id: &str, to_id: &str) -> Result<()> {
let edge = EdgeRecord::new(from_id, to_id, "ENABLES");
self.add_edge(edge).await
}
/// Create an AMPLIFIES edge
pub async fn amplifies(&self, from_id: &str, to_id: &str, factor: f32) -> Result<()> {
let edge = EdgeRecord::new(from_id, to_id, "AMPLIFIES")
.with_amplification(factor);
self.add_edge(edge).await
}
// =========================================================================
// COUNTERFACTUAL OPERATIONS
// =========================================================================
/// Fork database for counterfactual reasoning
pub fn fork(&self) -> Database {
Database {
path: self.path.clone(),
lance: Arc::clone(&self.lance),
sql_engine: Arc::clone(&self.sql_engine),
hamming: Arc::clone(&self.hamming),
version: self.version + 1,
}
}
// =========================================================================
// DATABASE INFO
// =========================================================================
/// Database path
pub fn path(&self) -> &str {
&self.path
}
/// Current version
pub fn version(&self) -> u64 {
self.version
}
/// Number of indexed fingerprints (in-memory)
pub fn fingerprint_count(&self) -> usize {
self.hamming.read().len()
}
}
/// Impact analysis report
#[derive(Debug)]
pub struct ImpactReport {
pub total_affected: usize,
pub butterfly_count: usize,
pub affected_batches: Vec<RecordBatch>,
pub butterfly_batches: Vec<RecordBatch>,
}
// Convenience function
pub fn open<P: AsRef<Path>>(path: P) -> Result<Database> {
Database::open_sync(path)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_open_memory() {
let db = Database::memory();
assert_eq!(db.path(), ":memory:");
}
#[test]
fn test_resonate() {
let db = Database::memory();
// Index some fingerprints
let fps: Vec<Fingerprint> = (0..100)
.map(|i| Fingerprint::from_content(&format!("thought_{}", i)))
.collect();
db.index_fingerprints(fps);
// Search
let query = Fingerprint::from_content("thought_50");
let results = db.resonate(&query, 0.5, 10);
// Should find exact match with similarity 1.0
assert!(!results.is_empty());
assert!(results[0].1 > 0.99);
}
#[test]
fn test_fork() {
let db = Database::memory();
let forked = db.fork();
assert_eq!(forked.version(), db.version() + 1);
}
#[tokio::test]
async fn test_cypher_transpile() {
let cypher = "MATCH (a:Thought)-[:CAUSES]->(b:Thought) RETURN b";
let sql = cypher_to_sql(cypher).unwrap();
assert!(sql.contains("SELECT"));
assert!(sql.contains("JOIN edges"));
}
}