22//!
33//! META-AGENT: enable behind `audit-log` feature; add `pub mod audit;` to lib.rs;
44//! add `audit-log = []` to Cargo.toml [features].
5+ //! META-AGENT: also add `datafusion-plan = ["dep:datafusion"]` feature alias to
6+ //! Cargo.toml [features] if not already present (gates `audit_from_plan`).
57//!
68//! Append-only audit log for every RLS-rewritten query. The default
79//! [`InMemoryAuditSink`] is a bounded ring buffer; production deployments
810//! will swap in a Lance-backed writer in a follow-up PR.
911
12+ use std:: collections:: VecDeque ;
1013use std:: sync:: Mutex ;
1114use std:: time:: { SystemTime , UNIX_EPOCH } ;
1215
@@ -17,10 +20,39 @@ pub struct AuditEntry {
1720 pub tenant_id : String ,
1821 pub actor_id : String ,
1922 /// Stable hash of the rewritten LogicalPlan or its display string.
20- /// Use std::hash::DefaultHasher (no extra deps).
23+ /// Computed via [`hash_statement`] (FNV-64a, stable across Rust versions
24+ /// and platforms — safe to persist and compare across binaries).
2125 pub statement_hash : u64 ,
2226 pub statement_kind : StatementKind ,
23- pub rls_predicates_added : u8 ,
27+ pub rls_predicates_added : u16 ,
28+ /// Optional rewritten LogicalPlan as a display string. Allows plan replay
29+ /// for retroactive policy enforcement (epiphany E3 from PR #279 outlook).
30+ /// None for sinks that don't capture plans (e.g. error-path entries).
31+ pub rewritten_plan : Option < String > ,
32+ }
33+
34+ impl AuditEntry {
35+ /// Construct an audit entry that retains the rewritten plan's display string.
36+ /// Used by the policy layer (RlsRewriter) at the moment of plan transformation.
37+ pub fn with_plan (
38+ tenant_id : impl Into < String > ,
39+ actor_id : impl Into < String > ,
40+ statement_kind : StatementKind ,
41+ plan_text : impl Into < String > ,
42+ rls_predicates_added : u16 ,
43+ ) -> Self {
44+ let plan_text = plan_text. into ( ) ;
45+ let statement_hash = hash_statement ( & plan_text) ;
46+ Self {
47+ ts_unix_ms : now_unix_ms ( ) ,
48+ tenant_id : tenant_id. into ( ) ,
49+ actor_id : actor_id. into ( ) ,
50+ statement_hash,
51+ statement_kind,
52+ rls_predicates_added,
53+ rewritten_plan : Some ( plan_text) ,
54+ }
55+ }
2456}
2557
2658/// Coarse classification of the audited statement.
@@ -35,15 +67,19 @@ pub enum StatementKind {
3567
3668/// Append-only sink. Default impl is in-memory ring buffer; production
3769/// path swaps in a Lance-backed writer in a follow-up PR.
38- pub trait AuditSink : Send + Sync {
70+ pub trait AuditSink : Send + Sync + std :: fmt :: Debug {
3971 fn append ( & self , entry : AuditEntry ) ;
4072 fn snapshot ( & self ) -> Vec < AuditEntry > ;
4173}
4274
4375/// In-memory bounded ring buffer used for tests and development.
76+ ///
77+ /// Backed by a `VecDeque` so that overflow eviction is O(1) (`pop_front`)
78+ /// rather than O(n) (`Vec::remove(0)`). Append + snapshot remain bounded
79+ /// in time regardless of capacity.
4480#[ derive( Debug ) ]
4581pub struct InMemoryAuditSink {
46- entries : Mutex < Vec < AuditEntry > > ,
82+ entries : Mutex < VecDeque < AuditEntry > > ,
4783 cap : usize ,
4884}
4985
@@ -59,7 +95,7 @@ impl InMemoryAuditSink {
5995 pub fn with_capacity ( cap : usize ) -> Self {
6096 let cap = cap. max ( 1 ) ;
6197 Self {
62- entries : Mutex :: new ( Vec :: with_capacity ( cap) ) ,
98+ entries : Mutex :: new ( VecDeque :: with_capacity ( cap) ) ,
6399 cap,
64100 }
65101 }
@@ -68,32 +104,38 @@ impl InMemoryAuditSink {
68104impl AuditSink for InMemoryAuditSink {
69105 fn append ( & self , entry : AuditEntry ) {
70106 // F-09: recover from a poisoned mutex rather than panicking.
71- let mut guard = match self . entries . lock ( ) {
72- Ok ( g) => g,
73- Err ( poisoned) => poisoned. into_inner ( ) ,
74- } ;
75- if guard. len ( ) >= self . cap {
76- // Drop oldest (ring semantics).
77- guard. remove ( 0 ) ;
107+ let mut guard = self . entries . lock ( ) . unwrap_or_else ( |e| e. into_inner ( ) ) ;
108+ if guard. len ( ) == self . cap {
109+ // Drop oldest (ring semantics) — O(1) on VecDeque.
110+ guard. pop_front ( ) ;
78111 }
79- guard. push ( entry) ;
112+ guard. push_back ( entry) ;
80113 }
81114
82115 fn snapshot ( & self ) -> Vec < AuditEntry > {
83- let guard = match self . entries . lock ( ) {
84- Ok ( g) => g,
85- Err ( poisoned) => poisoned. into_inner ( ) ,
86- } ;
87- guard. clone ( )
116+ let guard = self . entries . lock ( ) . unwrap_or_else ( |e| e. into_inner ( ) ) ;
117+ guard. iter ( ) . cloned ( ) . collect ( )
88118 }
89119}
90120
91- /// Helper to compute statement_hash from any &str.
121+ /// Stable FNV-64a hash of a statement's text (or display form of a LogicalPlan).
122+ ///
123+ /// **Stability guarantee:** this is the FNV-1a 64-bit algorithm with the
124+ /// canonical offset basis `0xcbf29ce484222325` and prime `0x100000001b3`.
125+ /// It is byte-for-byte identical across Rust versions, target platforms,
126+ /// and process restarts — making the resulting `statement_hash` safe to
127+ /// persist (e.g. in a Lance-backed audit log) and compare across binaries.
128+ ///
129+ /// The previous implementation used `std::collections::hash_map::DefaultHasher`,
130+ /// whose output is explicitly not stable across Rust versions and therefore
131+ /// could not be relied on for long-lived audit records.
92132pub fn hash_statement ( stmt_text : & str ) -> u64 {
93- use std:: hash:: { Hash , Hasher } ;
94- let mut h = std:: collections:: hash_map:: DefaultHasher :: new ( ) ;
95- stmt_text. hash ( & mut h) ;
96- h. finish ( )
133+ let mut hash: u64 = 0xcbf29ce484222325 ;
134+ for byte in stmt_text. bytes ( ) {
135+ hash ^= byte as u64 ;
136+ hash = hash. wrapping_mul ( 0x100000001b3 ) ;
137+ }
138+ hash
97139}
98140
99141/// Convenience: current wall-clock time in unix milliseconds.
@@ -105,6 +147,27 @@ pub fn now_unix_ms() -> u64 {
105147 . unwrap_or ( 0 )
106148}
107149
150+ /// Build an AuditEntry from a rewritten DataFusion LogicalPlan.
151+ /// Used by RlsRewriter at the moment of plan transformation (epiphany E3 hook).
152+ #[ cfg( feature = "datafusion-plan" ) ]
153+ pub fn audit_from_plan (
154+ ctx : & crate :: rls:: RlsContext ,
155+ kind : StatementKind ,
156+ plan : & datafusion:: logical_expr:: LogicalPlan ,
157+ predicates_added : u16 ,
158+ ) -> AuditEntry {
159+ let plan_str = format ! ( "{:?}" , plan) ;
160+ AuditEntry {
161+ ts_unix_ms : now_unix_ms ( ) ,
162+ tenant_id : ctx. tenant_id . clone ( ) ,
163+ actor_id : ctx. actor_id . clone ( ) ,
164+ statement_hash : hash_statement ( & plan_str) ,
165+ statement_kind : kind,
166+ rls_predicates_added : predicates_added,
167+ rewritten_plan : Some ( plan_str) ,
168+ }
169+ }
170+
108171#[ cfg( test) ]
109172mod tests {
110173 use super :: * ;
@@ -119,6 +182,7 @@ mod tests {
119182 statement_hash : hash_statement ( tag) ,
120183 statement_kind : StatementKind :: Select ,
121184 rls_predicates_added : 1 ,
185+ rewritten_plan : None ,
122186 }
123187 }
124188
@@ -135,6 +199,7 @@ mod tests {
135199 assert_eq ! ( snap[ 0 ] . statement_hash, expected_hash) ;
136200 assert_eq ! ( snap[ 0 ] . statement_kind, StatementKind :: Select ) ;
137201 assert_eq ! ( snap[ 0 ] . rls_predicates_added, 1 ) ;
202+ assert ! ( snap[ 0 ] . rewritten_plan. is_none( ) ) ;
138203 }
139204
140205 #[ test]
@@ -177,6 +242,17 @@ mod tests {
177242 assert_ne ! ( h1, h3, "different inputs should (with overwhelming prob) differ" ) ;
178243 }
179244
245+ #[ test]
246+ fn hash_is_stable_fnv64a ( ) {
247+ // Spot-check the FNV-64a stability guarantee against known vectors.
248+ // Empty string → offset basis.
249+ assert_eq ! ( hash_statement( "" ) , 0xcbf29ce484222325 ) ;
250+ // "a" → 0xaf63dc4c8601ec8c (canonical FNV-1a 64-bit test vector).
251+ assert_eq ! ( hash_statement( "a" ) , 0xaf63dc4c8601ec8c ) ;
252+ // "foobar" → 0x85944171f73967e8 (canonical test vector).
253+ assert_eq ! ( hash_statement( "foobar" ) , 0x85944171f73967e8 ) ;
254+ }
255+
180256 #[ test]
181257 fn zero_capacity_is_normalized_to_one ( ) {
182258 let sink = InMemoryAuditSink :: with_capacity ( 0 ) ;
@@ -186,4 +262,49 @@ mod tests {
186262 assert_eq ! ( snap. len( ) , 1 ) ;
187263 assert_eq ! ( snap[ 0 ] . tenant_id, "tenant-b" ) ;
188264 }
265+
266+ #[ test]
267+ fn with_plan_constructor_captures_plan_text ( ) {
268+ let entry = AuditEntry :: with_plan (
269+ "tenant-x" ,
270+ "actor-x" ,
271+ StatementKind :: Select ,
272+ "Filter: tenant_id = 'tenant-x'\n TableScan: calls" ,
273+ 2 ,
274+ ) ;
275+ assert_eq ! ( entry. tenant_id, "tenant-x" ) ;
276+ assert_eq ! ( entry. actor_id, "actor-x" ) ;
277+ assert_eq ! ( entry. statement_kind, StatementKind :: Select ) ;
278+ assert_eq ! ( entry. rls_predicates_added, 2 ) ;
279+ let plan = entry. rewritten_plan . expect ( "plan retained" ) ;
280+ assert ! ( plan. starts_with( "Filter:" ) ) ;
281+ assert_eq ! ( entry. statement_hash, hash_statement( & plan) ) ;
282+ }
283+
284+ #[ test]
285+ fn concurrent_appends_no_loss ( ) {
286+ let sink = Arc :: new ( InMemoryAuditSink :: with_capacity ( 10_000 ) ) ;
287+ let handles: Vec < _ > = ( 0 ..8 )
288+ . map ( |t| {
289+ let s = sink. clone ( ) ;
290+ thread:: spawn ( move || {
291+ for i in 0 ..100 {
292+ s. append ( AuditEntry {
293+ ts_unix_ms : now_unix_ms ( ) ,
294+ tenant_id : format ! ( "t{}" , t) ,
295+ actor_id : format ! ( "a{}" , i) ,
296+ statement_hash : hash_statement ( & format ! ( "q{}-{}" , t, i) ) ,
297+ statement_kind : StatementKind :: Select ,
298+ rls_predicates_added : 1 ,
299+ rewritten_plan : None ,
300+ } ) ;
301+ }
302+ } )
303+ } )
304+ . collect ( ) ;
305+ for h in handles {
306+ h. join ( ) . unwrap ( ) ;
307+ }
308+ assert_eq ! ( sink. snapshot( ) . len( ) , 800 ) ;
309+ }
189310}
0 commit comments