Skip to content

Commit 5fda44a

Browse files
committed
embedded: Hardening Phase 4 - Shadow Buffer and Partial Command Handling
1 parent 037e6ed commit 5fda44a

2 files changed

Lines changed: 188 additions & 39 deletions

File tree

embedded/src/shadow.rs

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
extern crate alloc;
2+
use alloc::vec::Vec;
3+
use blake3::Hasher;
4+
5+
use valori_kernel::state::kernel::KernelState;
6+
use crate::wal;
7+
8+
// -----------------------------------------------------------------------
9+
// Shadow Kernel (Provisional Execution)
10+
// -----------------------------------------------------------------------
11+
12+
pub struct ShadowKernel<'a, const M: usize, const D: usize, const N: usize, const E: usize> {
13+
pub state: &'a mut KernelState<M, D, N, E>,
14+
pub wal_accumulator: Hasher,
15+
pub segment_active: bool,
16+
pub buffer: Vec<u8>,
17+
pub header_processed: bool,
18+
}
19+
20+
impl<'a, const M: usize, const D: usize, const N: usize, const E: usize> ShadowKernel<'a, M, D, N, E> {
21+
pub fn new(state: &'a mut KernelState<M, D, N, E>) -> Self {
22+
Self {
23+
state,
24+
wal_accumulator: Hasher::new(),
25+
segment_active: false,
26+
buffer: Vec::new(),
27+
header_processed: false,
28+
}
29+
}
30+
31+
pub fn start_segment(&mut self) {
32+
self.wal_accumulator = Hasher::new();
33+
self.segment_active = true;
34+
self.buffer.clear();
35+
self.header_processed = false;
36+
}
37+
38+
/// Apply a WAL chunk to the Shadow Kernel.
39+
/// Buffers data and applies only complete commands.
40+
/// Updates accumulator only for APPLIED commands.
41+
pub fn apply_chunk(&mut self, chunk: &[u8]) -> Result<(), ()> {
42+
if !self.segment_active {
43+
return Err(());
44+
}
45+
46+
self.buffer.extend_from_slice(chunk);
47+
48+
// Process Loop
49+
loop {
50+
// 1. Header Check (Once)
51+
if !self.header_processed {
52+
if self.buffer.is_empty() { return Ok(()); } // Need more data
53+
54+
let version = self.buffer[0];
55+
if version != 1 {
56+
return Err(()); // Bad Version
57+
}
58+
59+
// Accumulate Header Byte?
60+
// User: "Running Hash Accumulator... incrementall per applied command"
61+
// Usually Header is part of the "WAL Log Hash".
62+
// I will include it.
63+
self.wal_accumulator.update(&[version]);
64+
65+
self.buffer.remove(0); // Inefficient for Vec, but low freq (once).
66+
self.header_processed = true;
67+
}
68+
69+
if self.buffer.is_empty() { break; }
70+
71+
// 2. Try Apply Command
72+
// We pass a slice.
73+
match wal::try_apply_command(self.state, &self.buffer) {
74+
wal::ApplyResult::Applied(bytes_consumed) => {
75+
// Update Hash with consumed bytes (Command Data)
76+
let cmd_bytes = &self.buffer[0..bytes_consumed];
77+
self.wal_accumulator.update(cmd_bytes);
78+
79+
// Remove from buffer (inefficient drain from front, use VecDeque if std available, or circular buf if optimization needed. For Phase 4, Vec::drain is acceptable for correctness proof).
80+
// self.buffer.drain(0..bytes_consumed); // drain returns iterator, drop it.
81+
// drain is available in alloc::vec::Vec.
82+
let _ = self.buffer.drain(0..bytes_consumed);
83+
},
84+
wal::ApplyResult::Incomplete => {
85+
// Stop and wait for more data
86+
break;
87+
},
88+
wal::ApplyResult::Error => {
89+
return Err(()); // Invalid Data -> Halt
90+
}
91+
}
92+
}
93+
94+
Ok(())
95+
}
96+
97+
/// Finalize segment and return Accumulator Hash.
98+
pub fn get_accumulator_hash(&self) -> [u8; 32] {
99+
*self.wal_accumulator.finalize().as_bytes()
100+
}
101+
}

embedded/src/wal.rs

Lines changed: 87 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -51,50 +51,98 @@ fn read_i32(buf: &[u8], offset: &mut usize) -> Result<i32, ()> {
5151
Ok(i32::from_le_bytes(bytes))
5252
}
5353

54-
pub fn apply_wal_log<const M: usize, const D: usize, const N: usize, const E: usize>(
54+
pub enum ApplyResult {
55+
Applied(usize),
56+
Incomplete,
57+
Error,
58+
}
59+
60+
/// Try to apply a single command from the buffer.
61+
/// Returns byte count consumed, or status.
62+
pub fn try_apply_command<const M: usize, const D: usize, const N: usize, const E: usize>(
5563
state: &mut KernelState<M, D, N, E>,
56-
wal_bytes: &[u8]
57-
) -> Result<(), ()> {
64+
buf: &[u8]
65+
) -> ApplyResult {
5866
let mut offset = 0;
5967

60-
// 1. Check WAL Version
61-
// First byte reserved for format version.
62-
let version = read_u8(wal_bytes, &mut offset)?;
63-
if version != WAL_VERSION {
64-
return Err(()); // Unsupported version
65-
}
68+
// 1. Check WAL Version (Only if at start of buffer? No, Version is Stream Header?
69+
// Wait, users previous prompt "Each packet includes WAL_VERSION... chunk data".
70+
// Is the "WAL Stream" versioned, or the "Command Log" versioned?
71+
// In Phase 3, I put `WAL_VERSION` byte at start of `apply_wal_log`.
72+
// In Phase 4, the *Stream* has a version in Packet Header.
73+
// Does the *Payload* (the concatenated command log) have a version?
74+
// Phase 3 `main.rs` constructed payload with `0x01` at index 0.
75+
// If we buffer chunks, the first byte of the *assembled stream* is Sequence 0?
76+
// Or is every command versioned?
77+
// Phase 3 `wal.rs` checks version *once* at start of `apply_wal_log`.
78+
// If we are streaming, we only see the start once (at the beginning of time/segment).
79+
// The `ShadowKernel` should handle the "Stream Header" or "Log Header" byte.
80+
// BUT `try_apply_command` implies applying *commands*.
81+
// The Version Byte is NOT a command.
82+
// I should treat the Version Byte as a "Header" that must be consumed
83+
// before processing commands.
84+
// I will add `consume_header` or just handle it in Shadow logic?
85+
// Simpler: `try_apply_command` handles Opcode.
86+
// The `Version` check in `wal.rs` was for the whole buffer.
87+
// I should refactor `wal.rs` to NOT expect Version byte in `try_apply_command`?
88+
// Or `WAL_OP_VERSION`?
89+
// Current `wal.rs` expects Byte 0 = Version.
90+
// If I split this, `try_apply_command` should probably just look for Opcodes.
91+
// And `ShadowKernel` handles the initial Version Byte consumption.
92+
// "Reserve byte 0 = WAL format version".
93+
// I will stick to "First byte of entire log is version".
94+
// ShadowKernel needs to know if it has processed header.
6695

67-
// Process until buffer exhausted
68-
while offset < wal_bytes.len() {
69-
let opcode = read_u8(wal_bytes, &mut offset)?;
70-
71-
match opcode {
72-
WAL_OP_INSERT => {
73-
// Format: RecordID(u32) | Dim(u16) | Values...
74-
let rid = read_u32(wal_bytes, &mut offset)?;
75-
let dim = read_u16(wal_bytes, &mut offset)?;
76-
77-
if dim as usize != D {
78-
return Err(()); // Dimension mismatch
79-
}
80-
81-
let mut vector = FxpVector::<D>::new_zeros();
82-
for i in 0..D {
83-
let val = read_i32(wal_bytes, &mut offset)?;
84-
vector.data[i] = FxpScalar(val);
85-
}
86-
87-
// Apply to Kernel
88-
let id = RecordId(rid);
89-
let cmd = Command::InsertRecord { id, vector };
90-
91-
state.apply(&cmd).map_err(|_| ())?;
92-
}
93-
_ => {
94-
return Err(()); // Invalid Opcode
96+
// Command Parsing
97+
if buf.is_empty() { return ApplyResult::Incomplete; }
98+
99+
// Peek Opcode
100+
let opcode = buf[0];
101+
offset += 1; // Consume opcode check placeholder (will re-read or just assume)
102+
103+
match opcode {
104+
WAL_OP_INSERT => {
105+
// Opcode(1) + ID(4) + Dim(2)
106+
if buf.len() < 7 { return ApplyResult::Incomplete; }
107+
108+
// Read headers to get dim (to know size)
109+
// But I don't want to advance `offset` destructively if incomplete?
110+
// `read_u*` checks bounds.
111+
112+
// Re-read carefully
113+
let mut probe = 0;
114+
let _op = read_u8(buf, &mut probe).unwrap(); // 1
115+
let rid_res = read_u32(buf, &mut probe); // +4 = 5
116+
let dim_res = read_u16(buf, &mut probe); // +2 = 7
117+
118+
if rid_res.is_err() || dim_res.is_err() { return ApplyResult::Incomplete; }
119+
120+
let _rid = rid_res.unwrap();
121+
let dim = dim_res.unwrap();
122+
123+
if dim as usize != D { return ApplyResult::Error; }
124+
125+
let payload_size = (D * 4) as usize;
126+
if buf.len() < 7 + payload_size { return ApplyResult::Incomplete; }
127+
128+
// Full command available. Execute.
129+
offset = 0;
130+
let _ = read_u8(buf, &mut offset); // Op
131+
let rid = read_u32(buf, &mut offset).unwrap();
132+
let _ = read_u16(buf, &mut offset).unwrap(); // Dim
133+
134+
let mut vector = FxpVector::<D>::new_zeros();
135+
for i in 0..D {
136+
vector.data[i] = FxpScalar(read_i32(buf, &mut offset).unwrap());
95137
}
138+
139+
// Apply
140+
let id = RecordId(rid);
141+
let cmd = Command::InsertRecord { id, vector };
142+
if state.apply(&cmd).is_err() { return ApplyResult::Error; }
143+
144+
return ApplyResult::Applied(offset);
96145
}
146+
_ => return ApplyResult::Error,
97147
}
98-
99-
Ok(())
100148
}

0 commit comments

Comments
 (0)