Skip to content

Commit d4f11d7

Browse files
authored
Merge pull request #153 from AdaWorldAPI/claude/distributed-db-xor-s3-Ep2Zi
Claude/distributed db xor s3 ep2 zi
2 parents 0a9c379 + c483945 commit d4f11d7

10 files changed

Lines changed: 1103 additions & 41 deletions

File tree

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ crate-type = ["cdylib", "rlib"]
2424
# =============================================================================
2525

2626
[features]
27-
default = ["simd", "parallel"]
27+
default = ["simd", "parallel", "lancedb", "rustynum"]
2828

2929
# Core features
3030
simd = [] # AVX-512 SIMD for Hamming operations
@@ -150,6 +150,7 @@ num-traits = "0.2"
150150
num_cpus = "1.17"
151151
itertools = "0.13"
152152
once_cell = "1.21"
153+
ctrlc = "3.4"
153154
lazy_static = "1.5"
154155
tracing = "0.1"
155156
tracing-subscriber = "0.3"

crates/ladybug-contract/src/record.rs

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,33 @@
1-
//! CogRecord — fixed 2 KB record: metadata container + content container.
1+
//! CogRecord — fixed 4 KB record: metadata container + content container.
22
//!
3-
//! The **holy grail** layout: every record is exactly `[Container; 2]` = 2 KB.
4-
//! Container 0 is always metadata. Container 1 is always content.
3+
//! The **holy grail** layout: every record is exactly `[Container; 2]` = 4 KB.
4+
//! Container 0 is always metadata (16K bits). Container 1 is always content (16K bits).
55
//!
66
//! ```text
77
//! ┌─────────────────────────────────────────────────────────────┐
8-
//! │ meta (1 KB) identity, NARS truth, edges, rung, RL │
8+
//! │ meta (2 KB) identity, NARS truth, edges, rung, RL │
99
//! ├─────────────────────────────────────────────────────────────┤
10-
//! │ content (1 KB) searchable fingerprint (Hamming / SIMD) │
10+
//! │ content (2 KB) searchable fingerprint (Hamming / SIMD) │
1111
//! └─────────────────────────────────────────────────────────────┘
12-
//! = 2 KB = 1 Fingerprint = 1 DN tree node = 1 Redis value
12+
//! = 4 KB = 1 Fingerprint = 1 DN tree node = 1 Redis value
1313
//! ```
1414
//!
1515
//! For multi-container geometries (Xyz, Chunked, Tree), records are
16-
//! linked through the DN tree. Each linked record is still 2 KB.
16+
//! linked through the DN tree. Each linked record is still 4 KB.
1717
1818
use std::ops::Range;
1919

2020
use crate::container::Container;
2121
use crate::geometry::ContainerGeometry;
2222
use crate::meta::{MetaView, MetaViewMut, W_REPR_BASE};
2323

24-
/// Fixed-size cognitive record: 8,192-bit metadata + 8,192-bit content = 2 KB.
24+
/// Fixed-size cognitive record: 16,384-bit metadata + 16,384-bit content = 4 KB.
2525
///
2626
/// This is the atomic unit of storage, search, and transfer:
27-
/// - DN tree value = 1 CogRecord = 2 KB
28-
/// - Redis value = 1 CogRecord = 2 KB
27+
/// - DN tree value = 1 CogRecord = 4 KB
28+
/// - Redis value = 1 CogRecord = 4 KB
2929
/// - Fingerprint = 1 CogRecord (reinterpretable)
30-
/// - SIMD scan unit = 2 × 16 AVX-512 iterations = 32 iterations
30+
/// - SIMD scan unit = 2 × 32 AVX-512 iterations = 64 iterations
3131
///
3232
/// # Stack Allocated
3333
///
@@ -43,7 +43,7 @@ pub struct CogRecord {
4343
}
4444

4545
impl CogRecord {
46-
/// Byte size of a CogRecord (2 × 1 KB = 2048 bytes).
46+
/// Byte size of a CogRecord (2 × 2 KB = 4096 bytes).
4747
pub const SIZE: usize = 2 * crate::container::CONTAINER_BYTES;
4848

4949
/// Create a new record with the given geometry.
@@ -108,18 +108,19 @@ impl CogRecord {
108108
result
109109
}
110110

111-
/// Zero-copy byte view of the entire 2 KB record.
111+
/// Zero-copy byte view of the entire 4 KB record.
112112
#[inline]
113113
pub fn as_bytes(&self) -> &[u8; Self::SIZE] {
114114
// SAFETY: CogRecord is #[repr(C, align(64))] with two Containers.
115-
// Total size = 2 × 1024 = 2048 bytes.
115+
// Total size = 2 × 2048 = 4096 bytes.
116116
unsafe { &*(self as *const CogRecord as *const [u8; Self::SIZE]) }
117117
}
118118

119-
/// Construct from a 2 KB byte slice.
119+
/// Construct from byte slice (2 × CONTAINER_BYTES).
120120
pub fn from_bytes(bytes: &[u8; Self::SIZE]) -> Self {
121-
let meta_bytes: &[u8; 1024] = bytes[..1024].try_into().unwrap();
122-
let content_bytes: &[u8; 1024] = bytes[1024..].try_into().unwrap();
121+
use crate::container::CONTAINER_BYTES;
122+
let meta_bytes: &[u8; CONTAINER_BYTES] = bytes[..CONTAINER_BYTES].try_into().unwrap();
123+
let content_bytes: &[u8; CONTAINER_BYTES] = bytes[CONTAINER_BYTES..].try_into().unwrap();
123124
Self {
124125
meta: Container::from_bytes(meta_bytes),
125126
content: Container::from_bytes(content_bytes),

crates/ladybug-contract/src/wide_meta.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -216,11 +216,10 @@ impl<'a> WideMetaView<'a> {
216216
Self { words }
217217
}
218218

219-
/// Get a legacy MetaView over the lower 128 words.
219+
/// Get a MetaView over the words (now same width as WideMetaView).
220220
pub fn legacy(&self) -> crate::meta::MetaView<'_> {
221-
// SAFETY: lower 128 words of a 256-word array can be reinterpreted
222-
let legacy_words: &[u64; 128] = self.words[..128].try_into().unwrap();
223-
crate::meta::MetaView::new(legacy_words)
221+
// Container and WideContainer are now the same width (256 words)
222+
crate::meta::MetaView::new(self.words)
224223
}
225224

226225
// ====================================================================
@@ -481,10 +480,9 @@ impl<'a> WideMetaViewMut<'a> {
481480
WideMetaView { words: self.words }
482481
}
483482

484-
/// Get a legacy MetaViewMut over the lower 128 words.
483+
/// Get a MetaViewMut over the words (now same width as WideMetaViewMut).
485484
pub fn legacy_mut(&mut self) -> crate::meta::MetaViewMut<'_> {
486-
let legacy_words: &mut [u64; 128] = (&mut self.words[..128]).try_into().unwrap();
487-
crate::meta::MetaViewMut::new(legacy_words)
485+
crate::meta::MetaViewMut::new(self.words)
488486
}
489487

490488
// ====================================================================

src/bin/server.rs

Lines changed: 140 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ use ladybug::core::simd::{self, hamming_distance};
5252
use ladybug::nars::TruthValue;
5353
use ladybug::storage::service::{CognitiveService, CpuFeatures, ServiceConfig};
5454
use ladybug::storage::{Addr, BindSpace, CogRedis, FINGERPRINT_WORDS, RedisResult};
55+
#[cfg(feature = "lancedb")]
56+
use ladybug::storage::LancePersistence;
5557
use ladybug::{FINGERPRINT_BITS, FINGERPRINT_BYTES, VERSION};
5658

5759
// =============================================================================
@@ -482,6 +484,9 @@ struct DbState {
482484
cpu: CpuFeatures,
483485
/// Start time
484486
start_time: Instant,
487+
/// Lance persistence layer
488+
#[cfg(feature = "lancedb")]
489+
persistence: Arc<LancePersistence>,
485490
}
486491

487492
impl DbState {
@@ -492,14 +497,71 @@ impl DbState {
492497
};
493498

494499
let service = CognitiveService::new(svc_config).expect("Failed to create CognitiveService");
500+
let mut cog_redis = CogRedis::new();
501+
let mut fingerprints = Vec::new();
502+
503+
#[cfg(feature = "lancedb")]
504+
let persistence = {
505+
let lance_dir = format!("{}/lance", config.data_dir);
506+
let persistence = Arc::new(LancePersistence::new(&lance_dir));
507+
let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
508+
509+
if persistence.has_data() {
510+
eprintln!("[server] Lance data found, hydrating...");
511+
512+
if let Err(e) = rt.block_on(persistence.ensure_tables()) {
513+
eprintln!("[server] WARNING: ensure_tables failed: {}", e);
514+
}
515+
516+
match rt.block_on(persistence.hydrate()) {
517+
Ok(Some(space)) => {
518+
let node_count = space.nodes_iter()
519+
.filter(|(a, _)| a.prefix() > 0x0F)
520+
.count();
521+
let edge_count = space.edges_iter().count();
522+
cog_redis.replace_bind_space(space);
523+
eprintln!(
524+
"[server] Hydrated BindSpace: {} nodes, {} edges",
525+
node_count, edge_count
526+
);
527+
}
528+
Ok(None) => {
529+
eprintln!("[server] No BindSpace data to hydrate");
530+
}
531+
Err(e) => {
532+
eprintln!("[server] WARNING: BindSpace hydration failed: {}", e);
533+
}
534+
}
535+
536+
match rt.block_on(persistence.hydrate_index()) {
537+
Ok(fps) if !fps.is_empty() => {
538+
eprintln!("[server] Hydrated {} index fingerprints", fps.len());
539+
fingerprints = fps;
540+
}
541+
Ok(_) => {}
542+
Err(e) => {
543+
eprintln!("[server] WARNING: Index hydration failed: {}", e);
544+
}
545+
}
546+
} else {
547+
eprintln!("[server] No Lance data found, starting fresh");
548+
if let Err(e) = rt.block_on(persistence.ensure_tables()) {
549+
eprintln!("[server] WARNING: ensure_tables failed: {}", e);
550+
}
551+
}
552+
553+
persistence
554+
};
495555

496556
Self {
497-
fingerprints: Vec::new(),
557+
fingerprints,
498558
kv: HashMap::new(),
499-
cog_redis: CogRedis::new(),
559+
cog_redis,
500560
service,
501561
cpu: CpuFeatures::detect(),
502562
start_time: Instant::now(),
563+
#[cfg(feature = "lancedb")]
564+
persistence,
503565
}
504566
}
505567
}
@@ -1062,7 +1124,7 @@ fn handle_xor(body: &str, format: ResponseFormat) -> Vec<u8> {
10621124
// XOR: one pass over 256 u64 words — nanoseconds
10631125
let mut result = Fingerprint::zero();
10641126
for i in 0..FINGERPRINT_WORDS {
1065-
result.words_mut()[i] = fp_a.words()[i] ^ fp_b.words()[i];
1127+
result.as_raw_mut()[i] = fp_a.as_raw()[i] ^ fp_b.as_raw()[i];
10661128
}
10671129

10681130
match format {
@@ -1105,7 +1167,7 @@ fn handle_xor_verify(body: &str, format: ResponseFormat) -> Vec<u8> {
11051167
// XOR: identical fingerprints produce all zeros
11061168
let mut check = Fingerprint::zero();
11071169
for i in 0..FINGERPRINT_WORDS {
1108-
check.words_mut()[i] = fp_a.words()[i] ^ fp_b.words()[i];
1170+
check.as_raw_mut()[i] = fp_a.as_raw()[i] ^ fp_b.as_raw()[i];
11091171
}
11101172
let residual = check.popcount();
11111173
let valid = residual == 0;
@@ -3555,6 +3617,79 @@ fn main() {
35553617

35563618
let state: SharedState = Arc::new(RwLock::new(DbState::new(&config)));
35573619

3620+
// Spawn background persistence thread (flush every 30 seconds)
3621+
#[cfg(feature = "lancedb")]
3622+
{
3623+
let persist_state = Arc::clone(&state);
3624+
std::thread::spawn(move || {
3625+
let rt = tokio::runtime::Runtime::new().expect("persist runtime");
3626+
loop {
3627+
std::thread::sleep(std::time::Duration::from_secs(30));
3628+
3629+
// Check if there are dirty addresses (quick read lock)
3630+
let has_dirty = {
3631+
let db = persist_state.read().unwrap();
3632+
db.cog_redis.bind_space().dirty_addrs().next().is_some()
3633+
};
3634+
3635+
if has_dirty {
3636+
// Persist under read lock
3637+
let persist_result = {
3638+
let db = persist_state.read().unwrap();
3639+
let persistence = Arc::clone(&db.persistence);
3640+
let bs = db.cog_redis.bind_space();
3641+
rt.block_on(persistence.persist_full(bs))
3642+
};
3643+
3644+
match persist_result {
3645+
Ok(()) => {
3646+
if let Ok(mut db) = persist_state.write() {
3647+
db.cog_redis.bind_space_mut().clear_dirty();
3648+
}
3649+
eprintln!("[persist] BindSpace flushed to Lance");
3650+
}
3651+
Err(e) => {
3652+
eprintln!("[persist] BindSpace flush failed: {}", e);
3653+
}
3654+
}
3655+
}
3656+
3657+
// Persist index fingerprints
3658+
{
3659+
let db = persist_state.read().unwrap();
3660+
if !db.fingerprints.is_empty() {
3661+
let persistence = Arc::clone(&db.persistence);
3662+
if let Err(e) = rt.block_on(persistence.persist_index(&db.fingerprints)) {
3663+
eprintln!("[persist] Index flush failed: {}", e);
3664+
}
3665+
}
3666+
}
3667+
}
3668+
});
3669+
}
3670+
3671+
// Register SIGTERM/SIGINT handler for graceful shutdown
3672+
#[cfg(feature = "lancedb")]
3673+
{
3674+
let shutdown_state = Arc::clone(&state);
3675+
ctrlc::set_handler(move || {
3676+
eprintln!("\n[shutdown] Flushing to Lance before exit...");
3677+
let rt = tokio::runtime::Runtime::new().expect("shutdown runtime");
3678+
let db = shutdown_state.read().unwrap();
3679+
let persistence = Arc::clone(&db.persistence);
3680+
3681+
if let Err(e) = rt.block_on(persistence.persist_full(db.cog_redis.bind_space())) {
3682+
eprintln!("[shutdown] BindSpace flush failed: {}", e);
3683+
}
3684+
if let Err(e) = rt.block_on(persistence.persist_index(&db.fingerprints)) {
3685+
eprintln!("[shutdown] Index flush failed: {}", e);
3686+
}
3687+
eprintln!("[shutdown] Done. Exiting.");
3688+
std::process::exit(0);
3689+
})
3690+
.expect("Failed to set Ctrl-C handler");
3691+
}
3692+
35583693
// Spawn UDP bitpacked Hamming listener
35593694
spawn_udp_listener(&config.host, udp_port, Arc::clone(&state));
35603695

@@ -3565,6 +3700,7 @@ fn main() {
35653700

35663701
println!("Listening on http://{}", addr);
35673702
println!("UDP Hamming on udp://{}:{}", config.host, udp_port);
3703+
println!("Lance persistence: {}/lance", config.data_dir);
35683704

35693705
// Accept connections
35703706
for stream in listener.incoming() {

src/qualia/felt_parse.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -888,9 +888,10 @@ impl MirrorField {
888888
let self_fp = encode_axes(self_axes);
889889
let partner_fp = encode_axes(partner_axes);
890890

891-
let mut self_words = [0u64; 128];
892-
let mut thou_words = [0u64; 128];
893-
for i in 0..128.min(self_fp.len()) {
891+
use crate::container::CONTAINER_WORDS;
892+
let mut self_words = [0u64; CONTAINER_WORDS];
893+
let mut thou_words = [0u64; CONTAINER_WORDS];
894+
for i in 0..CONTAINER_WORDS.min(self_fp.len()) {
894895
self_words[i] = self_fp[i];
895896
thou_words[i] = partner_fp[i];
896897
}

src/query/datafusion.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use datafusion::execution::context::SessionContext;
3131
use datafusion::prelude::*;
3232
use std::sync::Arc;
3333

34-
use crate::Result;
34+
use crate::{Error, Result};
3535
use crate::core::DIM;
3636

3737
// =============================================================================
@@ -124,7 +124,7 @@ impl SqlEngine {
124124
if std::path::Path::new(&nodes_path).exists() {
125125
let dataset = Dataset::open(&nodes_path)
126126
.await
127-
.map_err(|e| Error::Storage(e.to_string()))?;
127+
.map_err(|e: lance::Error| Error::Storage(e.to_string()))?;
128128
let lance_schema = dataset.schema().clone();
129129
let arrow_schema: ArrowSchema = ArrowSchema::from(&lance_schema);
130130

@@ -133,13 +133,13 @@ impl SqlEngine {
133133
.scan()
134134
.try_into_stream()
135135
.await
136-
.map_err(|e| Error::Storage(e.to_string()))?;
136+
.map_err(|e: lance::Error| Error::Storage(e.to_string()))?;
137137

138138
use futures::StreamExt;
139-
let mut all_batches = Vec::new();
139+
let mut all_batches: Vec<RecordBatch> = Vec::new();
140140
let mut stream = batches;
141141
while let Some(batch) = stream.next().await {
142-
all_batches.push(batch.map_err(|e| Error::Storage(e.to_string()))?);
142+
all_batches.push(batch.map_err(|e: lance::Error| Error::Storage(e.to_string()))?);
143143
}
144144

145145
if !all_batches.is_empty() {
@@ -153,21 +153,21 @@ impl SqlEngine {
153153
if std::path::Path::new(&edges_path).exists() {
154154
let dataset = Dataset::open(&edges_path)
155155
.await
156-
.map_err(|e| Error::Storage(e.to_string()))?;
156+
.map_err(|e: lance::Error| Error::Storage(e.to_string()))?;
157157
let lance_schema = dataset.schema().clone();
158158
let arrow_schema: ArrowSchema = ArrowSchema::from(&lance_schema);
159159

160160
let batches = dataset
161161
.scan()
162162
.try_into_stream()
163163
.await
164-
.map_err(|e| Error::Storage(e.to_string()))?;
164+
.map_err(|e: lance::Error| Error::Storage(e.to_string()))?;
165165

166166
use futures::StreamExt;
167-
let mut all_batches = Vec::new();
167+
let mut all_batches: Vec<RecordBatch> = Vec::new();
168168
let mut stream = batches;
169169
while let Some(batch) = stream.next().await {
170-
all_batches.push(batch.map_err(|e| Error::Storage(e.to_string()))?);
170+
all_batches.push(batch.map_err(|e: lance::Error| Error::Storage(e.to_string()))?);
171171
}
172172

173173
if !all_batches.is_empty() {

0 commit comments

Comments
 (0)