Skip to content

Commit 0b6906d

Browse files
committed
feat: implement deterministic metadata support
- Added optional field to - Updated kernel state, events, and commands to handle metadata - Implemented deterministic hashing and snapshot encoding/decoding for metadata - Bumped snapshot schema version to 2 - Added comprehensive tests for metadata persistence, sizing, and backward compatibility - Updated existing tests to accommodate API changes
1 parent 2c1982c commit 0b6906d

20 files changed

Lines changed: 472 additions & 60 deletions

src/config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,6 @@ pub const FRAC_BITS: u32 = 16;
66

77
/// Scaling factor for Fixed-Point representation (1 << FRAC_BITS).
88
pub const SCALE: i32 = 1 << FRAC_BITS;
9+
10+
/// Maximum size in bytes for record metadata.
11+
pub const MAX_METADATA_SIZE: usize = 65536;

src/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ pub enum KernelError {
1313
InvalidOperation,
1414
/// Invalid input.
1515
InvalidInput,
16+
/// Metadata too large.
17+
MetadataTooLarge,
1618
}
1719

1820
pub type KernelResult<T> = core::result::Result<T, KernelError>;

src/event.rs

Lines changed: 173 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,22 @@
1919
use crate::types::id::{RecordId, NodeId, EdgeId};
2020
use crate::types::vector::FxpVector;
2121
use crate::types::enums::{NodeKind, EdgeKind};
22-
use serde::{Serialize, Deserialize};
22+
use serde::{Serialize, Deserialize, Serializer, Deserializer};
23+
use serde::ser::{SerializeStruct, SerializeStructVariant};
24+
use serde::de::{self, Visitor, MapAccess, SeqAccess, EnumAccess, VariantAccess};
25+
use core::fmt;
2326

2427
/// KernelEvent represents the canonical event language for Valori.
2528
/// This is the ONLY way to express state transitions.
2629
///
2730
/// Each variant represents an atomic, deterministic operation on the kernel state.
28-
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
31+
#[derive(Clone, Debug, PartialEq, Eq)]
2932
pub enum KernelEvent<const D: usize> {
3033
/// Insert a new vector record into the kernel
3134
InsertRecord {
3235
id: RecordId,
3336
vector: FxpVector<D>,
37+
metadata: Option<alloc::vec::Vec<u8>>,
3438
},
3539

3640
/// Delete an existing vector record from the kernel
@@ -72,6 +76,172 @@ impl<const D: usize> KernelEvent<D> {
7276
}
7377
}
7478

79+
// Custom Serialization to support strict V2 Metadata format
80+
impl<const D: usize> Serialize for KernelEvent<D> {
81+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
82+
where
83+
S: Serializer,
84+
{
85+
match self {
86+
KernelEvent::InsertRecord { id, vector, metadata } => {
87+
// We serialize as a struct variant with 3 fields for Serialize
88+
// But specifically for metadata, we manually encode the length + bytes
89+
// To achieve "No version flag", we just write the fields.
90+
// Bincode enum serialization: [VariantIdx][Field1][Field2][...]
91+
let mut state = serializer.serialize_struct_variant("KernelEvent", 0, "InsertRecord", 3)?;
92+
state.serialize_field("id", id)?;
93+
state.serialize_field("vector", vector)?;
94+
95+
// Custom Metadata Serialization: u32 Len + Bytes
96+
// We wrap this in a helper or just serialize a "RawMetadata" struct
97+
let meta_wrapper = RawMetadata(metadata.as_ref());
98+
state.serialize_field("metadata", &meta_wrapper)?;
99+
100+
state.end()
101+
}
102+
KernelEvent::DeleteRecord { id } => {
103+
let mut state = serializer.serialize_struct_variant("KernelEvent", 1, "DeleteRecord", 1)?;
104+
state.serialize_field("id", id)?;
105+
state.end()
106+
}
107+
KernelEvent::CreateNode { id, kind, record } => {
108+
let mut state = serializer.serialize_struct_variant("KernelEvent", 2, "CreateNode", 3)?;
109+
state.serialize_field("id", id)?;
110+
state.serialize_field("kind", kind)?;
111+
state.serialize_field("record", record)?;
112+
state.end()
113+
}
114+
KernelEvent::CreateEdge { id, from, to, kind } => {
115+
let mut state = serializer.serialize_struct_variant("KernelEvent", 3, "CreateEdge", 4)?;
116+
state.serialize_field("id", id)?;
117+
state.serialize_field("from", from)?;
118+
state.serialize_field("to", to)?;
119+
state.serialize_field("kind", kind)?;
120+
state.end()
121+
}
122+
KernelEvent::DeleteEdge { id } => {
123+
let mut state = serializer.serialize_struct_variant("KernelEvent", 4, "DeleteEdge", 1)?;
124+
state.serialize_field("id", id)?;
125+
state.end()
126+
}
127+
}
128+
}
129+
}
130+
131+
struct RawMetadata<'a>(Option<&'a alloc::vec::Vec<u8>>);
132+
133+
impl<'a> Serialize for RawMetadata<'a> {
134+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
135+
where S: Serializer {
136+
match self.0 {
137+
Some(bytes) => {
138+
let len = bytes.len() as u32;
139+
// We can't write two fields here easily if we are just one field in the parent struct?
140+
// Actually Bincode flattens structs.
141+
// So if we serialize as a tuple `(len, bytes)`, it writes [len][bytes].
142+
(len, bytes).serialize(serializer)
143+
}
144+
None => {
145+
let len: u32 = 0;
146+
// Write 0 length, no bytes.
147+
len.serialize(serializer)
148+
}
149+
}
150+
}
151+
}
152+
153+
// Custom Deserialization
154+
impl<'de, const D: usize> Deserialize<'de> for KernelEvent<D> {
155+
fn deserialize<Deser>(deserializer: Deser) -> Result<Self, Deser::Error>
156+
where
157+
Deser: Deserializer<'de>,
158+
{
159+
// Use a Shadow Enum that matches the structure but uses a custom type for Metadata
160+
// This allows us to intercept the metadata deserialization for backward compatibility logic
161+
#[derive(Serialize, Deserialize)]
162+
enum KernelEventHelper<const D: usize> {
163+
InsertRecord {
164+
id: RecordId,
165+
vector: FxpVector<D>,
166+
#[serde(with = "raw_metadata_serde")]
167+
metadata: Option<alloc::vec::Vec<u8>>,
168+
},
169+
DeleteRecord {
170+
id: RecordId,
171+
},
172+
CreateNode {
173+
id: NodeId,
174+
kind: NodeKind,
175+
record: Option<RecordId>,
176+
},
177+
CreateEdge {
178+
id: EdgeId,
179+
from: NodeId,
180+
to: NodeId,
181+
kind: EdgeKind,
182+
},
183+
DeleteEdge {
184+
id: EdgeId,
185+
},
186+
}
187+
188+
// Delegate to the Helper
189+
let helper = KernelEventHelper::<D>::deserialize(deserializer)?;
190+
191+
Ok(match helper {
192+
KernelEventHelper::InsertRecord { id, vector, metadata } => KernelEvent::InsertRecord { id, vector, metadata },
193+
KernelEventHelper::DeleteRecord { id } => KernelEvent::DeleteRecord { id },
194+
KernelEventHelper::CreateNode { id, kind, record } => KernelEvent::CreateNode { id, kind, record },
195+
KernelEventHelper::CreateEdge { id, from, to, kind } => KernelEvent::CreateEdge { id, from, to, kind },
196+
KernelEventHelper::DeleteEdge { id } => KernelEvent::DeleteEdge { id },
197+
})
198+
}
199+
}
200+
201+
mod raw_metadata_serde {
202+
use super::*;
203+
use serde::{Serializer, Deserializer};
204+
205+
pub fn serialize<S>(metadata: &Option<alloc::vec::Vec<u8>>, serializer: S) -> Result<S::Ok, S::Error>
206+
where S: Serializer {
207+
match metadata {
208+
Some(bytes) => {
209+
let len = bytes.len() as u32;
210+
(len, bytes).serialize(serializer)
211+
}
212+
None => {
213+
let len: u32 = 0;
214+
len.serialize(serializer)
215+
}
216+
}
217+
}
218+
219+
pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<alloc::vec::Vec<u8>>, D::Error>
220+
where D: Deserializer<'de> {
221+
struct MetadataVisitor;
222+
impl<'de> Visitor<'de> for MetadataVisitor {
223+
type Value = Option<alloc::vec::Vec<u8>>;
224+
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
225+
formatter.write_str("metadata length and bytes")
226+
}
227+
228+
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
229+
where A: SeqAccess<'de> {
230+
let len: u32 = seq.next_element()?.ok_or_else(|| de::Error::invalid_length(0, &self))?;
231+
232+
if len == 0 {
233+
return Ok(None);
234+
}
235+
236+
let bytes: alloc::vec::Vec<u8> = seq.next_element()?.ok_or_else(|| de::Error::invalid_length(1, &self))?;
237+
Ok(Some(bytes))
238+
}
239+
}
240+
241+
deserializer.deserialize_tuple(2, MetadataVisitor)
242+
}
243+
}
244+
75245
#[cfg(test)]
76246
mod tests {
77247
use super::*;
@@ -82,6 +252,7 @@ mod tests {
82252
let event = KernelEvent::<16>::InsertRecord {
83253
id: RecordId(42),
84254
vector: FxpVector::new_zeros(),
255+
metadata: Some(alloc::vec![0xAA, 0xBB]),
85256
};
86257

87258
let bytes1 = bincode::serde::encode_to_vec(&event, bincode::config::standard()).unwrap();

src/replay_events.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ mod tests {
152152
journal.append(KernelEvent::InsertRecord {
153153
id: RecordId(1),
154154
vector: FxpVector::new_zeros(),
155+
metadata: None,
155156
});
156157

157158
assert_eq!(journal.buffer_len(), 1);
@@ -171,6 +172,7 @@ mod tests {
171172
journal.append(KernelEvent::InsertRecord {
172173
id: RecordId(1),
173174
vector: FxpVector::new_zeros(),
175+
metadata: None,
174176
});
175177

176178
journal.discard_buffer();

src/snapshot/decode.rs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ pub fn decode_state<const MAX_RECORDS: usize, const D: usize, const MAX_NODES: u
5252
offset += 4;
5353

5454
let schema_ver = read_u32(buf, &mut offset)?;
55-
if schema_ver != crate::snapshot::encode::SCHEMA_VERSION {
55+
// We support V1 and V2
56+
if schema_ver != 1 && schema_ver != 2 {
5657
return Err(KernelError::InvalidOperation); // Version mismatch
5758
}
5859

@@ -81,16 +82,34 @@ pub fn decode_state<const MAX_RECORDS: usize, const D: usize, const MAX_NODES: u
8182
for i in 0..D {
8283
vector.data[i] = FxpScalar(read_i32(buf, &mut offset)?);
8384
}
85+
86+
// Metadata V2 logic
87+
let metadata = if schema_ver >= 2 {
88+
let meta_len = read_u32(buf, &mut offset)?;
89+
if meta_len > 0 {
90+
let len = meta_len as usize;
91+
if offset + len > buf.len() {
92+
return Err(KernelError::InvalidOperation); // Truncated
93+
}
94+
let mut bytes = alloc::vec![0u8; len];
95+
bytes.copy_from_slice(&buf[offset..offset+len]);
96+
offset += len;
97+
Some(bytes)
98+
} else {
99+
None
100+
}
101+
} else {
102+
None
103+
};
84104

85-
// Validation of ID bounds is implicit via array access check or capacity check above?
86-
// But id_val could be > MAX_RECORDS inside the byte stream even if cap matches.
87105
let idx = id_val as usize;
88106
if idx >= MAX_RECORDS {
89107
return Err(KernelError::CapacityExceeded);
90108
}
91109
state.records.records[idx] = Some(Record {
92110
id: RecordId(id_val),
93111
vector,
112+
metadata,
94113
flags,
95114
});
96115
}

src/snapshot/encode.rs

Lines changed: 32 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::state::kernel::KernelState;
55
use crate::error::{Result, KernelError};
66

77
pub const MAGIC: &[u8; 4] = b"VALK";
8-
pub const SCHEMA_VERSION: u32 = 1;
8+
pub const SCHEMA_VERSION: u32 = 2; // Bumped for Metadata support
99

1010
/// writes a u32 to the buffer at offset
1111
fn write_u32(buf: &mut [u8], offset: &mut usize, val: u32) -> Result<()> {
@@ -18,14 +18,8 @@ fn write_u32(buf: &mut [u8], offset: &mut usize, val: u32) -> Result<()> {
1818
Ok(())
1919
}
2020

21-
fn write_u64(buf: &mut [u8], offset: &mut usize, val: u64) -> Result<()> {
22-
if *offset + 8 > buf.len() {
23-
return Err(KernelError::CapacityExceeded);
24-
}
25-
let bytes = val.to_le_bytes();
26-
buf[*offset..*offset + 8].copy_from_slice(&bytes);
27-
*offset += 8;
28-
Ok(())
21+
fn write_i32(buf: &mut [u8], offset: &mut usize, val: i32) -> Result<()> {
22+
write_u32(buf, offset, val as u32)
2923
}
3024

3125
fn write_u8(buf: &mut [u8], offset: &mut usize, val: u8) -> Result<()> {
@@ -37,13 +31,22 @@ fn write_u8(buf: &mut [u8], offset: &mut usize, val: u8) -> Result<()> {
3731
Ok(())
3832
}
3933

40-
fn write_i32(buf: &mut [u8], offset: &mut usize, val: i32) -> Result<()> {
41-
if *offset + 4 > buf.len() {
34+
fn write_u64(buf: &mut [u8], offset: &mut usize, val: u64) -> Result<()> {
35+
if *offset + 8 > buf.len() {
4236
return Err(KernelError::CapacityExceeded);
4337
}
4438
let bytes = val.to_le_bytes();
45-
buf[*offset..*offset + 4].copy_from_slice(&bytes);
46-
*offset += 4;
39+
buf[*offset..*offset + 8].copy_from_slice(&bytes);
40+
*offset += 8;
41+
Ok(())
42+
}
43+
44+
fn write_bytes(buf: &mut [u8], offset: &mut usize, data: &[u8]) -> Result<()> {
45+
if *offset + data.len() > buf.len() {
46+
return Err(KernelError::CapacityExceeded);
47+
}
48+
buf[*offset..*offset + data.len()].copy_from_slice(data);
49+
*offset += data.len();
4750
Ok(())
4851
}
4952

@@ -52,21 +55,17 @@ pub fn encode_state<const MAX_RECORDS: usize, const D: usize, const MAX_NODES: u
5255
buf: &mut [u8],
5356
) -> Result<usize> {
5457
let mut offset = 0;
55-
58+
5659
// Header
57-
if offset + 4 > buf.len() { return Err(KernelError::CapacityExceeded); }
58-
buf[offset..offset+4].copy_from_slice(MAGIC);
59-
offset += 4;
60+
write_bytes(buf, &mut offset, MAGIC)?;
61+
write_u32(buf, &mut offset, SCHEMA_VERSION)?; // Version
62+
write_u64(buf, &mut offset, state.version.0)?; // State Version
6063

61-
write_u32(buf, &mut offset, SCHEMA_VERSION)?;
62-
write_u64(buf, &mut offset, state.version.0)?;
63-
64-
// Capacities (to check compatibility on restore)
64+
// Capacities
6565
write_u32(buf, &mut offset, MAX_RECORDS as u32)?;
6666
write_u32(buf, &mut offset, D as u32)?;
6767
write_u32(buf, &mut offset, MAX_NODES as u32)?;
6868
write_u32(buf, &mut offset, MAX_EDGES as u32)?;
69-
7069
// Records
7170
let record_count = state.records.len() as u32;
7271
write_u32(buf, &mut offset, record_count)?;
@@ -77,7 +76,18 @@ pub fn encode_state<const MAX_RECORDS: usize, const D: usize, const MAX_NODES: u
7776
for scalar in record.vector.data.iter() {
7877
write_i32(buf, &mut offset, scalar.0)?;
7978
}
79+
// V2 Metadata
80+
match &record.metadata {
81+
Some(m) => {
82+
write_u32(buf, &mut offset, m.len() as u32)?;
83+
write_bytes(buf, &mut offset, m)?;
84+
}
85+
None => {
86+
write_u32(buf, &mut offset, 0)?;
87+
}
88+
}
8089
}
90+
// ...
8191

8292
// Nodes
8393
let mut node_count = 0;

0 commit comments

Comments
 (0)