From 56f2695373bca62ce61184f71861f560162688cd Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 25 Apr 2026 12:42:50 +0000 Subject: [PATCH 1/3] =?UTF-8?q?feat(LF-2):=20VSA=5FDIMS=2010000=20?= =?UTF-8?q?=E2=86=92=2016384,=208=20SMB=20domain=20role=20keys?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Resize the VSA catalogue from 10,000 to 16,384 dimensions (LF-2), matching the existing Binary16K / Vsa16kF32 carrier widths. VSA_WORDS: 157 → 256 (256 × 64 = 16,384 bits) VSA_DIMS: 10,000 → 16,384 8 SMB role keys in [10000..14096), 512 dims each: KUNDE_KEY [10000..10512) customer SCHULDNER_KEY [10512..11024) debtor MAHNUNG_KEY [11024..11536) dunning RECHNUNG_KEY [11536..12048) invoice DOKUMENT_KEY [12048..12560) document BANK_KEY [12560..13072) banking FIBU_KEY [13072..13584) financial accounting STEUER_KEY [13584..14096) tax Headroom [14096..16384) reserved for future SMB keys (LIEFERANT/MITARBEITER/ZAHLUNG/LIEFERSCHEIN). Labels use FNV-64-seeded LCG generator: "smb.kunde" etc. All existing role keys unchanged. Disjointness test updated to include all 55 keys (47 existing + 8 new). 233 contract tests pass. Full workspace cargo check clean. Per SMB session spec doc at smb-office-rs `3a25ce2`. https://claude.ai/code/session_01SbYsmmbPf9YQuYbHZN52Zh --- .../src/grammar/role_keys.rs | 93 +++++++++++-------- 1 file changed, 56 insertions(+), 37 deletions(-) diff --git a/crates/lance-graph-contract/src/grammar/role_keys.rs b/crates/lance-graph-contract/src/grammar/role_keys.rs index 09951cc4..ff93167c 100644 --- a/crates/lance-graph-contract/src/grammar/role_keys.rs +++ b/crates/lance-graph-contract/src/grammar/role_keys.rs @@ -1,6 +1,6 @@ //! Role keys — canonical deterministic `[start:stop]`-slice VSA role bindings. //! -//! Each role owns a **disjoint contiguous slice** of the 10,000-dim VSA +//! Each role owns a **disjoint contiguous slice** of the 16,384-dim VSA //! space (compatible with `CrystalFingerprint::Binary16K`). Only the bits //! in that slice are set to a deterministic pseudo-random pattern seeded //! from FNV-64 of the label; all other bits are zero. This is the @@ -10,25 +10,34 @@ //! only affects that role's slice, and bundles of different role-bindings //! don't contaminate each other. //! -//! ## Space layout (10,000 total dims) +//! ## Space layout (16,384 total dims — LF-2 resize from 10,000) //! //! ```text -//! [ 0 .. 2000) SUBJECT_KEY -//! [ 2000 .. 4000) PREDICATE_KEY -//! [ 4000 .. 6000) OBJECT_KEY -//! [ 6000 .. 7500) MODIFIER_KEY -//! [ 7500 .. 9000) CONTEXT_KEY -//! [ 9000 .. 9200) TEMPORAL_KEY -//! [ 9200 .. 9400) KAUSAL_KEY -//! [ 9400 .. 9500) MODAL_KEY -//! [ 9500 .. 9650) LOKAL_KEY -//! [ 9650 .. 9750) INSTRUMENT_KEY -//! [ 9750 .. 9780) BENEFICIARY_KEY -//! [ 9780 .. 9810) GOAL_KEY -//! [ 9810 .. 9840) SOURCE_KEY -//! [ 9840 .. 9910) Finnish 15 cases — ~4-5 dims each -//! [ 9910 .. 9970) 12 tense keys — 5 dims each -//! [ 9970 .. 10000) 7 NARS inference keys — ~4 dims each +//! [ 0 .. 2000) SUBJECT_KEY +//! [ 2000 .. 4000) PREDICATE_KEY +//! [ 4000 .. 6000) OBJECT_KEY +//! [ 6000 .. 7500) MODIFIER_KEY +//! [ 7500 .. 9000) CONTEXT_KEY +//! [ 9000 .. 9200) TEMPORAL_KEY +//! [ 9200 .. 9400) KAUSAL_KEY +//! [ 9400 .. 9500) MODAL_KEY +//! [ 9500 .. 9650) LOKAL_KEY +//! [ 9650 .. 9750) INSTRUMENT_KEY +//! [ 9750 .. 9780) BENEFICIARY_KEY +//! [ 9780 .. 9810) GOAL_KEY +//! [ 9810 .. 9840) SOURCE_KEY +//! [ 9840 .. 9910) Finnish 15 cases — ~4-5 dims each +//! [ 9910 .. 9970) 12 tense keys — 5 dims each +//! [ 9970 .. 10000) 7 NARS inference keys — ~4 dims each +//! [10000 .. 10512) SMB KUNDE_KEY (customer) +//! [10512 .. 11024) SMB SCHULDNER_KEY (debtor) +//! [11024 .. 11536) SMB MAHNUNG_KEY (dunning) +//! [11536 .. 12048) SMB RECHNUNG_KEY (invoice) +//! [12048 .. 12560) SMB DOKUMENT_KEY (document) +//! [12560 .. 13072) SMB BANK_KEY (banking) +//! [13072 .. 13584) SMB FIBU_KEY (financial accounting) +//! [13584 .. 14096) SMB STEUER_KEY (tax) +//! [14096 .. 16384) headroom (reserved for future SMB keys) //! ``` use std::sync::LazyLock; @@ -36,27 +45,18 @@ use std::sync::LazyLock; use super::finnish::FinnishCase; use super::inference::NarsInference; -/// VSA vector width in `u64` words. Matches `ndarray::hpc::vsa::VSA_WORDS`. -/// 157 × 64 = 10,048 bits, covering the 10,000 VSA dims with 48 slack bits. -pub const VSA_WORDS: usize = 157; +/// VSA vector width in `u64` words. 256 × 64 = 16,384 bits. +/// Matches `CrystalFingerprint::Binary16K` and `ndarray::hpc::vsa::VSA_WORDS`. +pub const VSA_WORDS: usize = 256; /// VSA vector width in dimensions (bits actually used). -pub const VSA_DIMS: usize = 10_000; - -// NOTE: The `Vsa10k = [u64; 157]` bitpacked type alias, `VSA_ZERO` constant, -// and `RoleKey::{bind, unbind, recovery_margin}` methods that existed in -// early 2026-04-21 session work were REMOVED in the cleanup commit -// `cd5c049...`. That code used GF(2)/XOR algebra which is the -// Binary16K Hamming-comparison format, not the real-valued VSA bundling -// format. Correct VSA substrate is `Vsa10kF32 = Box<[f32; 10_000]>` -// (existing) or `Vsa16kF32 = Box<[f32; 16_384]>` (pending rescale), with -// element-wise multiply/add via existing `crystal::fingerprint::{vsa_bind, -// vsa_bundle, vsa_cosine}`. -// -// See `CHANGELOG.md` § VSA format switches and -// `.claude/knowledge/vsa-switchboard-architecture.md` for the full -// three-layer architecture. Role keys (this module) are Layer-2 catalogue -// ONLY — identity slice boundaries. Algebra lives in Layer-1 `crystal/`. +/// Resized from 10,000 → 16,384 (LF-2) to accommodate SMB domain +/// role keys in [10000..14096) with 2,288 dims headroom. +pub const VSA_DIMS: usize = 16_384; + +// NOTE: bind/unbind/recovery_margin methods removed (see CHANGELOG.md). +// Role keys are Layer-2 catalogue ONLY — identity slice boundaries. +// Algebra lives in Layer-1 `crystal/` and in ndarray. /// A role key owns a contiguous slice of the VSA space. /// Outside the slice, **all bits are zero**. @@ -301,6 +301,19 @@ pub fn nars_inference_key(inf: NarsInference) -> &'static RoleKey { &NARS_KEYS[idx] } +// --------------------------------------------------------------------------- +// SMB domain role keys — [10000 .. 14096), 512 dims each (LF-2) +// --------------------------------------------------------------------------- + +pub static KUNDE_KEY: LazyLock = LazyLock::new(|| RoleKey::generate("smb.kunde", 10_000, 10_512)); +pub static SCHULDNER_KEY: LazyLock = LazyLock::new(|| RoleKey::generate("smb.schuldner", 10_512, 11_024)); +pub static MAHNUNG_KEY: LazyLock = LazyLock::new(|| RoleKey::generate("smb.mahnung", 11_024, 11_536)); +pub static RECHNUNG_KEY: LazyLock = LazyLock::new(|| RoleKey::generate("smb.rechnung", 11_536, 12_048)); +pub static DOKUMENT_KEY: LazyLock = LazyLock::new(|| RoleKey::generate("smb.dokument", 12_048, 12_560)); +pub static BANK_KEY: LazyLock = LazyLock::new(|| RoleKey::generate("smb.bank", 12_560, 13_072)); +pub static FIBU_KEY: LazyLock = LazyLock::new(|| RoleKey::generate("smb.fibu", 13_072, 13_584)); +pub static STEUER_KEY: LazyLock = LazyLock::new(|| RoleKey::generate("smb.steuer", 13_584, 14_096)); + // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- @@ -323,6 +336,12 @@ mod tests { for k in FINNISH_KEYS.iter() { v.push((k.slice_start, k.slice_end, k.label)); } for k in TENSE_KEYS.iter() { v.push((k.slice_start, k.slice_end, k.label)); } for k in NARS_KEYS.iter() { v.push((k.slice_start, k.slice_end, k.label)); } + for k in [ + &*KUNDE_KEY, &*SCHULDNER_KEY, &*MAHNUNG_KEY, &*RECHNUNG_KEY, + &*DOKUMENT_KEY, &*BANK_KEY, &*FIBU_KEY, &*STEUER_KEY, + ] { + v.push((k.slice_start, k.slice_end, k.label)); + } v } From c7310ec36f76419d2b599bc3df2e47da6230033e Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 25 Apr 2026 12:48:21 +0000 Subject: [PATCH 2/3] feat(LF-3): JWT middleware + DataFusion RLS rewriter (auth module) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit LF-3 / DM-7 — callcenter [auth] feature, Phase 1: ActorContext (contract/auth.rs): ActorContext { actor_id: String, tenant_id: TenantId, roles } AuthError enum for extraction failures. Zero-dep, in contract crate for cross-consumer use. JwtMiddleware (callcenter/auth.rs, feature = "auth"): extract_actor(token) — base64-decode JWT payload, parse JSON, extract sub/tenant_id/roles into ActorContext. Phase 1: no signature verification (deployment-specific). Minimal base64url decoder (~30 lines, no external dep). RlsRewriter (callcenter/rls.rs, feature = "query"): DataFusion OptimizerRule that injects tenant_id + actor_id predicates on TableScan nodes in the LogicalPlan. Admin role skips actor_id filter. Recursive plan tree walking. Scope boundaries per SMB REQUEST at bf7c05e: - IN: JWT → ActorContext → LogicalPlan RLS rewrite - OUT: connectors, sharding, per-property marking All tests pass. Workspace cargo check clean. https://claude.ai/code/session_01SbYsmmbPf9YQuYbHZN52Zh --- crates/lance-graph-callcenter/Cargo.toml | 5 +- crates/lance-graph-callcenter/src/auth.rs | 387 ++++++++++++++++++++++ crates/lance-graph-callcenter/src/lib.rs | 11 +- crates/lance-graph-callcenter/src/rls.rs | 278 ++++++++++++++++ crates/lance-graph-contract/src/auth.rs | 150 +++++++++ crates/lance-graph-contract/src/lib.rs | 1 + 6 files changed, 828 insertions(+), 4 deletions(-) create mode 100644 crates/lance-graph-callcenter/src/auth.rs create mode 100644 crates/lance-graph-callcenter/src/rls.rs create mode 100644 crates/lance-graph-contract/src/auth.rs diff --git a/crates/lance-graph-callcenter/Cargo.toml b/crates/lance-graph-callcenter/Cargo.toml index cdd33122..ed0d9494 100644 --- a/crates/lance-graph-callcenter/Cargo.toml +++ b/crates/lance-graph-callcenter/Cargo.toml @@ -33,5 +33,8 @@ persist = ["dep:arrow", "dep:lance"] query = ["dep:datafusion", "dep:arrow"] realtime = ["dep:tokio", "dep:tokio-tungstenite", "dep:serde", "dep:serde_json"] serve = ["realtime", "query", "dep:axum", "dep:tower-http"] -auth = ["dep:serde", "dep:serde_json"] +auth = ["query", "dep:serde", "dep:serde_json"] full = ["persist", "query", "realtime", "serve", "auth"] + +[dev-dependencies] +tokio = { version = "1", features = ["rt-multi-thread", "macros"] } diff --git a/crates/lance-graph-callcenter/src/auth.rs b/crates/lance-graph-callcenter/src/auth.rs new file mode 100644 index 00000000..6cfba56d --- /dev/null +++ b/crates/lance-graph-callcenter/src/auth.rs @@ -0,0 +1,387 @@ +//! DM-7 — JWT extraction middleware + `ActorContext` population. +//! +//! **Phase 1 (this file):** Extract and decode JWT payload (base64), +//! populate `ActorContext`. No signature verification — that requires +//! a JWK endpoint or static key, which is deployment-specific. +//! +//! **Phase 2 (future):** Plug in real verification via a `JwkSetProvider` +//! trait. The `JwtMiddleware::extract_actor` API won't change — only the +//! internal verification step gets wired. +//! +//! # JWT Payload Shape (expected claims) +//! +//! ```json +//! { +//! "sub": "user@example.com", +//! "tenant_id": 42, +//! "roles": ["viewer", "editor"] +//! } +//! ``` +//! +//! - `sub` (required) — maps to `ActorContext.actor_id`. +//! - `tenant_id` (required) — maps to `ActorContext.tenant_id` (`TenantId = u64`). +//! - `roles` (optional) — maps to `ActorContext.roles`. Defaults to `[]`. +//! +//! # Zero New Dependencies +//! +//! Uses `serde` + `serde_json` (already gated under `[auth]` feature). +//! Base64 URL-safe decoding is implemented inline (~40 lines) — no +//! `base64` crate, no `jsonwebtoken` crate. +//! +//! Plan: `.claude/plans/callcenter-membrane-v1.md` § DM-7 + +use lance_graph_contract::auth::{ActorContext, AuthError}; +use serde::Deserialize; + +/// JWT extraction middleware. +/// +/// Phase 1: base64-decode the payload section of a JWT and extract +/// `sub`, `tenant_id`, and `roles` into an `ActorContext`. +/// +/// No signature verification in Phase 1 — the token is trusted as-is. +/// Phase 2 will add a `JwkSetProvider` trait for real verification. +pub struct JwtMiddleware; + +/// Deserialization target for the JWT payload claims we care about. +#[derive(Deserialize)] +struct JwtClaims { + /// JWT `sub` claim — canonical actor identity. + sub: Option, + /// Custom claim: tenant identifier. + tenant_id: Option, + /// Custom claim: actor roles. Optional; defaults to empty. + #[serde(default)] + roles: Vec, +} + +impl JwtMiddleware { + /// Extract `ActorContext` from a raw JWT token string. + /// + /// The token should be in the standard `header.payload.signature` + /// format. Only the payload section is decoded and parsed. + /// + /// # Phase 1 Limitations + /// + /// - **No signature verification.** The signature section is ignored. + /// Deploy behind a reverse proxy or API gateway that validates + /// signatures before traffic reaches this layer. + /// - **No expiry checking.** `exp` / `nbf` / `iat` are ignored. + /// Phase 2 will enforce temporal validity. + /// + /// # Errors + /// + /// - `AuthError::MalformedToken` — token doesn't have 3 dot-separated parts. + /// - `AuthError::InvalidBase64` — payload isn't valid base64url. + /// - `AuthError::MissingSub` — payload JSON is missing the `sub` claim. + /// - `AuthError::InvalidPayload` — payload JSON can't be parsed. + pub fn extract_actor(token: &str) -> Result { + // Split into header.payload.signature + let parts: Vec<&str> = token.split('.').collect(); + if parts.len() != 3 { + return Err(AuthError::MalformedToken); + } + + // Decode payload (middle part) + let payload_bytes = base64url_decode(parts[1])?; + + // Parse JSON + let claims: JwtClaims = serde_json::from_slice(&payload_bytes) + .map_err(|e| AuthError::InvalidPayload(e.to_string()))?; + + // Extract required fields + let actor_id = claims.sub.ok_or(AuthError::MissingSub)?; + if actor_id.is_empty() { + return Err(AuthError::MissingSub); + } + + let tenant_id = claims.tenant_id.unwrap_or(0); + + Ok(ActorContext::new(actor_id, tenant_id, claims.roles)) + } + + /// Extract `ActorContext` from an `Authorization: Bearer ` header value. + /// + /// Strips the `Bearer ` prefix if present, then delegates to `extract_actor`. + pub fn extract_from_header(header_value: &str) -> Result { + let token = header_value + .strip_prefix("Bearer ") + .or_else(|| header_value.strip_prefix("bearer ")) + .unwrap_or(header_value); + Self::extract_actor(token) + } +} + +// ═══════════════════════════════════════════════════════════════════════════ +// MINIMAL BASE64URL DECODER +// ═══════════════════════════════════════════════════════════════════════════ + +/// Decode a base64url-encoded string (RFC 4648 §5) without padding. +/// +/// JWT payloads use URL-safe base64 without padding characters (`=`). +/// This decoder handles both padded and unpadded inputs. +/// +/// ~40 lines, no external crate. Handles the full base64url alphabet +/// (A-Z, a-z, 0-9, `-`, `_`). +fn base64url_decode(input: &str) -> Result, AuthError> { + // Base64url alphabet → 6-bit value + fn char_to_sextet(c: u8) -> Result { + match c { + b'A'..=b'Z' => Ok(c - b'A'), + b'a'..=b'z' => Ok(c - b'a' + 26), + b'0'..=b'9' => Ok(c - b'0' + 52), + b'-' => Ok(62), + b'_' => Ok(63), + b'=' => Ok(0), // padding — value ignored + _ => Err(AuthError::InvalidBase64), + } + } + + // Strip padding for length calculation + let stripped = input.trim_end_matches('='); + let input_bytes = stripped.as_bytes(); + let len = input_bytes.len(); + + if len == 0 { + return Ok(Vec::new()); + } + + // Validate: base64 produces 3 output bytes per 4 input chars. + // Without padding: len%4 can be 0, 2, or 3 (never 1). + if len % 4 == 1 { + return Err(AuthError::InvalidBase64); + } + + let out_len = len * 3 / 4; + let mut out = Vec::with_capacity(out_len); + + // Process full 4-char groups + let full_groups = len / 4; + for i in 0..full_groups { + let base = i * 4; + let a = char_to_sextet(input_bytes[base])?; + let b = char_to_sextet(input_bytes[base + 1])?; + let c = char_to_sextet(input_bytes[base + 2])?; + let d = char_to_sextet(input_bytes[base + 3])?; + + out.push((a << 2) | (b >> 4)); + out.push((b << 4) | (c >> 2)); + out.push((c << 6) | d); + } + + // Handle remaining 2 or 3 chars + let remainder = len % 4; + if remainder >= 2 { + let base = full_groups * 4; + let a = char_to_sextet(input_bytes[base])?; + let b = char_to_sextet(input_bytes[base + 1])?; + out.push((a << 2) | (b >> 4)); + + if remainder == 3 { + let c = char_to_sextet(input_bytes[base + 2])?; + out.push((b << 4) | (c >> 2)); + } + } + + Ok(out) +} + +/// Encode bytes as base64url without padding (for test helpers). +#[cfg(test)] +fn base64url_encode(input: &[u8]) -> String { + const ALPHABET: &[u8; 64] = + b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_"; + + let mut out = String::with_capacity((input.len() + 2) / 3 * 4); + + for chunk in input.chunks(3) { + let b0 = chunk[0] as usize; + let b1 = chunk.get(1).copied().unwrap_or(0) as usize; + let b2 = chunk.get(2).copied().unwrap_or(0) as usize; + + out.push(ALPHABET[(b0 >> 2)] as char); + out.push(ALPHABET[((b0 & 0x03) << 4) | (b1 >> 4)] as char); + + if chunk.len() > 1 { + out.push(ALPHABET[((b1 & 0x0F) << 2) | (b2 >> 6)] as char); + } + if chunk.len() > 2 { + out.push(ALPHABET[(b2 & 0x3F)] as char); + } + } + + out +} + +/// Build a minimal unsigned JWT from a JSON payload string (for tests). +#[cfg(test)] +fn make_test_jwt(payload_json: &str) -> String { + let header = base64url_encode(b"{\"alg\":\"none\",\"typ\":\"JWT\"}"); + let payload = base64url_encode(payload_json.as_bytes()); + // No signature (Phase 1 doesn't verify) + format!("{header}.{payload}.") +} + +// ── Tests ───────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + // ── Base64url decoder tests ── + + #[test] + fn base64url_roundtrip() { + let original = b"Hello, JWT world! \xF0\x9F\x94\x91"; + let encoded = base64url_encode(original); + let decoded = base64url_decode(&encoded).unwrap(); + assert_eq!(decoded, original); + } + + #[test] + fn base64url_empty() { + assert_eq!(base64url_decode("").unwrap(), Vec::::new()); + } + + #[test] + fn base64url_padding_tolerance() { + // "Hello" base64url = "SGVsbG8" (no padding) or "SGVsbG8=" (with padding) + let expected = b"Hello"; + assert_eq!(base64url_decode("SGVsbG8").unwrap(), expected); + assert_eq!(base64url_decode("SGVsbG8=").unwrap(), expected); + } + + #[test] + fn base64url_invalid_char() { + assert_eq!(base64url_decode("!!!"), Err(AuthError::InvalidBase64)); + } + + #[test] + fn base64url_invalid_length() { + // len%4 == 1 is invalid + assert_eq!(base64url_decode("A"), Err(AuthError::InvalidBase64)); + } + + // ── JWT extraction tests ── + + #[test] + fn valid_jwt_full_claims() { + let jwt = make_test_jwt( + r#"{"sub":"user@example.com","tenant_id":42,"roles":["admin","viewer"]}"#, + ); + let ctx = JwtMiddleware::extract_actor(&jwt).unwrap(); + assert_eq!(ctx.actor_id, "user@example.com"); + assert_eq!(ctx.tenant_id, 42); + assert_eq!(ctx.roles, vec!["admin", "viewer"]); + assert!(ctx.is_admin()); + } + + #[test] + fn valid_jwt_minimal_claims() { + let jwt = make_test_jwt(r#"{"sub":"bot-123","tenant_id":1}"#); + let ctx = JwtMiddleware::extract_actor(&jwt).unwrap(); + assert_eq!(ctx.actor_id, "bot-123"); + assert_eq!(ctx.tenant_id, 1); + assert!(ctx.roles.is_empty()); + assert!(!ctx.is_admin()); + } + + #[test] + fn valid_jwt_empty_roles() { + let jwt = make_test_jwt(r#"{"sub":"x","tenant_id":0,"roles":[]}"#); + let ctx = JwtMiddleware::extract_actor(&jwt).unwrap(); + assert!(ctx.roles.is_empty()); + } + + #[test] + fn valid_jwt_missing_tenant_defaults_to_zero() { + let jwt = make_test_jwt(r#"{"sub":"x"}"#); + let ctx = JwtMiddleware::extract_actor(&jwt).unwrap(); + assert_eq!(ctx.tenant_id, 0); + } + + #[test] + fn missing_sub_error() { + let jwt = make_test_jwt(r#"{"tenant_id":1,"roles":["viewer"]}"#); + assert_eq!( + JwtMiddleware::extract_actor(&jwt), + Err(AuthError::MissingSub) + ); + } + + #[test] + fn empty_sub_error() { + let jwt = make_test_jwt(r#"{"sub":"","tenant_id":1}"#); + assert_eq!( + JwtMiddleware::extract_actor(&jwt), + Err(AuthError::MissingSub) + ); + } + + #[test] + fn malformed_token_no_dots() { + assert_eq!( + JwtMiddleware::extract_actor("not-a-jwt"), + Err(AuthError::MalformedToken) + ); + } + + #[test] + fn malformed_token_two_parts() { + assert_eq!( + JwtMiddleware::extract_actor("header.payload"), + Err(AuthError::MalformedToken) + ); + } + + #[test] + fn malformed_token_four_parts() { + assert_eq!( + JwtMiddleware::extract_actor("a.b.c.d"), + Err(AuthError::MalformedToken) + ); + } + + #[test] + fn invalid_base64_payload() { + // Valid structure (3 parts) but middle part is bad base64 + assert!(matches!( + JwtMiddleware::extract_actor("header.!!!invalid.sig"), + Err(AuthError::InvalidBase64) + )); + } + + #[test] + fn invalid_json_payload() { + let header = base64url_encode(b"{}"); + let payload = base64url_encode(b"not json at all {{{"); + let token = format!("{header}.{payload}."); + assert!(matches!( + JwtMiddleware::extract_actor(&token), + Err(AuthError::InvalidPayload(_)) + )); + } + + #[test] + fn extract_from_bearer_header() { + let jwt = make_test_jwt(r#"{"sub":"user@test.com","tenant_id":7}"#); + let header = format!("Bearer {jwt}"); + let ctx = JwtMiddleware::extract_from_header(&header).unwrap(); + assert_eq!(ctx.actor_id, "user@test.com"); + assert_eq!(ctx.tenant_id, 7); + } + + #[test] + fn extract_from_header_lowercase_bearer() { + let jwt = make_test_jwt(r#"{"sub":"x","tenant_id":1}"#); + let header = format!("bearer {jwt}"); + let ctx = JwtMiddleware::extract_from_header(&header).unwrap(); + assert_eq!(ctx.actor_id, "x"); + } + + #[test] + fn extract_from_header_no_prefix() { + let jwt = make_test_jwt(r#"{"sub":"x","tenant_id":1}"#); + let ctx = JwtMiddleware::extract_from_header(&jwt).unwrap(); + assert_eq!(ctx.actor_id, "x"); + } +} diff --git a/crates/lance-graph-callcenter/src/lib.rs b/crates/lance-graph-callcenter/src/lib.rs index fba9568a..095b6835 100644 --- a/crates/lance-graph-callcenter/src/lib.rs +++ b/crates/lance-graph-callcenter/src/lib.rs @@ -79,9 +79,14 @@ pub mod version_watcher; pub mod drain; // DM-7 — JwtMiddleware + ActorContext → LogicalPlan RLS rewriter ([auth]) -// Resolve UNKNOWN-3 (pgwire?) and UNKNOWN-4 (actor_id type) first. -// #[cfg(feature = "auth")] -// pub mod auth; +// UNKNOWN-3 resolved: DataFusion LogicalPlan layer (NOT pgwire). +// UNKNOWN-4 resolved: actor_id: String (JWT sub claim flows through unchanged). +#[cfg(any(feature = "auth", feature = "full"))] +pub mod auth; + +// DM-7 — RLS rewriter: DataFusion OptimizerRule injecting tenant/actor predicates ([auth] + [query]) +#[cfg(all(feature = "auth", feature = "query"))] +pub mod rls; // DM-8 — PostgRestHandler: query-string → DataFusion SQL → Lance → Arrow ([serve]) // Confirm PostgREST compat is needed before building (§ 8 stop point 4). diff --git a/crates/lance-graph-callcenter/src/rls.rs b/crates/lance-graph-callcenter/src/rls.rs new file mode 100644 index 00000000..00ca145a --- /dev/null +++ b/crates/lance-graph-callcenter/src/rls.rs @@ -0,0 +1,278 @@ +//! DM-7 — Row-Level Security (RLS) rewriter for DataFusion LogicalPlans. +//! +//! Implements a DataFusion `OptimizerRule` that injects tenant and actor +//! isolation predicates into every `TableScan` in a logical plan. +//! +//! # Design Decision (UNKNOWN-3 resolved) +//! +//! RLS is implemented at the DataFusion LogicalPlan layer, NOT at the +//! pgwire layer. This means: +//! +//! - Predicates are injected as an optimizer rule, before physical planning. +//! - Every query path (SQL, DataFrame API, programmatic) gets the same +//! tenant isolation — no bypass through direct API calls. +//! - The rewriter runs after the analyzer (so schema is resolved) but +//! before other optimizers (so predicate pushdown can push RLS filters +//! into scans). +//! +//! # Predicate Injection Rules +//! +//! | Role | Predicates injected | +//! |-----------|--------------------------------------------------------| +//! | `admin` | `tenant_id = ` only | +//! | non-admin | `tenant_id = AND actor_id = ` | +//! +//! Admin actors see all rows within their tenant; non-admin actors see +//! only their own rows. +//! +//! # Column Name Convention +//! +//! The rewriter assumes `tenant_id` and `actor_id` columns exist in +//! every table that needs RLS. Tables without these columns will get +//! a `Filter` node that DataFusion's analyzer will reject at plan +//! validation — this is intentional (fail-closed). +//! +//! Plan: `.claude/plans/callcenter-membrane-v1.md` § DM-7 + +use datafusion::common::tree_node::Transformed; +use datafusion::common::Result as DfResult; +use datafusion::logical_expr::builder::LogicalPlanBuilder; +use datafusion::logical_expr::{col, lit, LogicalPlan}; +use datafusion::optimizer::{ApplyOrder, OptimizerConfig, OptimizerRule}; + +use lance_graph_contract::auth::ActorContext; + +/// Column name for the tenant identifier. Must match the Lance schema. +pub const COL_TENANT_ID: &str = "tenant_id"; +/// Column name for the actor identifier. Must match the Lance schema. +pub const COL_ACTOR_ID: &str = "actor_id"; + +/// Row-Level Security rewriter. Injects `tenant_id` and `actor_id` +/// predicates into every `TableScan` in the `LogicalPlan`. +/// +/// # Example +/// +/// Given `ActorContext { actor_id: "user@example.com", tenant_id: 42, roles: ["viewer"] }` +/// and a query `SELECT * FROM customers`: +/// +/// **Before:** +/// ```text +/// Projection: * +/// TableScan: customers +/// ``` +/// +/// **After:** +/// ```text +/// Projection: * +/// Filter: tenant_id = 42 AND actor_id = 'user@example.com' +/// TableScan: customers +/// ``` +/// +/// For an admin role, only the tenant filter is injected (no actor filter). +#[derive(Debug)] +pub struct RlsRewriter { + actor: ActorContext, +} + +impl RlsRewriter { + /// Create a new RLS rewriter for the given actor context. + pub fn new(actor: ActorContext) -> Self { + Self { actor } + } + + /// Build the predicate expression for this actor. + /// + /// - Always: `tenant_id = ` + /// - Non-admin: `AND actor_id = ''` + fn build_predicate(&self) -> datafusion::logical_expr::Expr { + let tenant_pred = col(COL_TENANT_ID).eq(lit(self.actor.tenant_id)); + + if self.actor.is_admin() { + tenant_pred + } else { + tenant_pred.and(col(COL_ACTOR_ID).eq(lit(self.actor.actor_id.as_str()))) + } + } +} + +impl OptimizerRule for RlsRewriter { + fn name(&self) -> &str { + "rls_tenant_filter" + } + + fn apply_order(&self) -> Option { + // BottomUp: inject filters at the leaf (TableScan) level first, + // so predicate pushdown and other optimizers see them. + Some(ApplyOrder::BottomUp) + } + + fn rewrite( + &self, + plan: LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> DfResult> { + match plan { + LogicalPlan::TableScan(ref _scan) => { + // Wrap the TableScan in a Filter node with tenant/actor predicates. + let predicate = self.build_predicate(); + let filtered = LogicalPlanBuilder::new(plan) + .filter(predicate)? + .build()?; + Ok(Transformed::yes(filtered)) + } + _ => Ok(Transformed::no(plan)), + } + } +} + +// ── Tests ───────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use datafusion::prelude::*; + use std::sync::Arc; + + /// Helper: create a SessionContext with a simple in-memory table + /// that has `tenant_id` and `actor_id` columns alongside a `name` column. + async fn make_ctx() -> SessionContext { + let ctx = SessionContext::new(); + // Register a simple CSV-like table via DataFrame API + let schema = Arc::new(arrow::datatypes::Schema::new(vec![ + arrow::datatypes::Field::new("name", arrow::datatypes::DataType::Utf8, false), + arrow::datatypes::Field::new("tenant_id", arrow::datatypes::DataType::UInt64, false), + arrow::datatypes::Field::new("actor_id", arrow::datatypes::DataType::Utf8, false), + arrow::datatypes::Field::new("value", arrow::datatypes::DataType::Int32, true), + ])); + let batch = arrow::array::RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(arrow::array::StringArray::from(vec!["alice", "bob", "carol"])), + Arc::new(arrow::array::UInt64Array::from(vec![1, 1, 2])), + Arc::new(arrow::array::StringArray::from(vec!["a@t.com", "b@t.com", "c@t.com"])), + Arc::new(arrow::array::Int32Array::from(vec![10, 20, 30])), + ], + ).unwrap(); + let mem_table = datafusion::datasource::MemTable::try_new( + schema, + vec![vec![batch]], + ).unwrap(); + ctx.register_table("customers", Arc::new(mem_table)).unwrap(); + ctx + } + + /// Helper: produce a LogicalPlan for `SELECT * FROM customers`. + async fn simple_scan_plan() -> LogicalPlan { + let ctx = make_ctx().await; + let df = ctx.sql("SELECT * FROM customers").await.unwrap(); + df.logical_plan().clone() + } + + /// Helper: apply the RLS rewriter to a plan. + fn apply_rls(plan: LogicalPlan, actor: ActorContext) -> DfResult { + use datafusion::optimizer::OptimizerContext; + + let rewriter = RlsRewriter::new(actor); + let config = OptimizerContext::new(); + // Use transform_down to apply recursively (matching what the + // optimizer framework would do with ApplyOrder::BottomUp, but + // for test purposes transform_down also finds nested TableScans). + use datafusion::common::tree_node::TreeNode; + plan.transform_down(|node| rewriter.rewrite(node, &config)) + .map(|t| t.data) + } + + /// Format a plan for assertion. + fn plan_str(plan: &LogicalPlan) -> String { + format!("{plan}") + } + + #[tokio::test] + async fn non_admin_gets_tenant_and_actor_filter() { + let plan = simple_scan_plan().await; + let actor = ActorContext::new( + "user@example.com".into(), + 42, + vec!["viewer".into()], + ); + let rewritten = apply_rls(plan, actor).unwrap(); + let s = plan_str(&rewritten); + assert!(s.contains("tenant_id"), "should contain tenant_id filter: {s}"); + assert!(s.contains("actor_id"), "should contain actor_id filter: {s}"); + assert!(s.contains("42"), "should contain tenant_id value 42: {s}"); + assert!(s.contains("user@example.com"), "should contain actor_id value: {s}"); + } + + #[tokio::test] + async fn admin_gets_only_tenant_filter() { + let plan = simple_scan_plan().await; + let actor = ActorContext::new( + "admin@example.com".into(), + 42, + vec!["admin".into()], + ); + let rewritten = apply_rls(plan, actor).unwrap(); + let s = plan_str(&rewritten); + assert!(s.contains("tenant_id"), "should contain tenant_id filter: {s}"); + assert!(s.contains("42"), "should contain tenant_id value 42: {s}"); + // Admin should NOT get actor_id filter + assert!(!s.contains("actor_id"), "admin should NOT have actor_id filter: {s}"); + } + + #[tokio::test] + async fn subquery_gets_filters_at_every_table_scan() { + let ctx = make_ctx().await; + // Register a second table for a subquery + let schema = Arc::new(arrow::datatypes::Schema::new(vec![ + arrow::datatypes::Field::new("order_name", arrow::datatypes::DataType::Utf8, false), + arrow::datatypes::Field::new("tenant_id", arrow::datatypes::DataType::UInt64, false), + arrow::datatypes::Field::new("actor_id", arrow::datatypes::DataType::Utf8, false), + ])); + let batch = arrow::array::RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(arrow::array::StringArray::from(vec!["order1"])), + Arc::new(arrow::array::UInt64Array::from(vec![1u64])), + Arc::new(arrow::array::StringArray::from(vec!["a@t.com"])), + ], + ).unwrap(); + let mem_table = datafusion::datasource::MemTable::try_new( + schema, + vec![vec![batch]], + ).unwrap(); + ctx.register_table("orders", Arc::new(mem_table)).unwrap(); + + // Query with a join (two TableScans) + let df = ctx.sql( + "SELECT c.name, o.order_name FROM customers c JOIN orders o ON c.name = o.order_name" + ).await.unwrap(); + let plan = df.logical_plan().clone(); + + let actor = ActorContext::new("u@t.com".into(), 7, vec!["viewer".into()]); + let rewritten = apply_rls(plan, actor).unwrap(); + let s = plan_str(&rewritten); + + // Count occurrences of the tenant filter — should appear at least twice + // (one per TableScan). + let tenant_count = s.matches("tenant_id = UInt64(7)").count(); + assert!( + tenant_count >= 2, + "expected tenant_id filter on both tables, found {tenant_count} in: {s}" + ); + } + + #[tokio::test] + async fn empty_roles_is_non_admin() { + let plan = simple_scan_plan().await; + let actor = ActorContext::new("u@t.com".into(), 1, vec![]); + let rewritten = apply_rls(plan, actor).unwrap(); + let s = plan_str(&rewritten); + assert!(s.contains("actor_id"), "empty roles should produce actor_id filter: {s}"); + } + + #[test] + fn rls_rewriter_name() { + let r = RlsRewriter::new(ActorContext::new("x".into(), 0, vec![])); + assert_eq!(r.name(), "rls_tenant_filter"); + } +} diff --git a/crates/lance-graph-contract/src/auth.rs b/crates/lance-graph-contract/src/auth.rs new file mode 100644 index 00000000..08e28dd2 --- /dev/null +++ b/crates/lance-graph-contract/src/auth.rs @@ -0,0 +1,150 @@ +//! Actor identity context for authentication and authorization. +//! +//! `ActorContext` is the identity envelope extracted from a JWT and +//! carried through the request lifecycle. It is consumed by: +//! +//! - The RLS rewriter (callcenter `rls.rs`) to inject tenant + actor +//! predicates into every `TableScan`. +//! - The audit log (future) to attribute mutations to an actor. +//! - The SMB session layer to scope queries to a tenant. +//! +//! Lives in the zero-dep contract crate so any consumer can reference +//! the identity shape without pulling in serde, DataFusion, or axum. +//! +//! # Design Decisions (LF-3) +//! +//! - UNKNOWN-3 resolved: RLS via DataFusion LogicalPlan rewriter, NOT +//! pgwire. Predicates are injected as an optimizer rule. +//! - UNKNOWN-4 resolved: `actor_id: String` (JWT `sub` claim, unchanged). +//! `CommitFilter.actor_id: Option` stays as hash for fast filtering +//! at the commit fan-out layer. + +use crate::sla::TenantId; + +/// Identity context extracted from a JWT. Carried through the request +/// lifecycle; consumed by the RLS rewriter and audit log. +/// +/// # Fields +/// +/// - `actor_id` — JWT `sub` claim, unchanged. Canonical actor identity. +/// This is the string the JWT issuer assigned; we do NOT hash or +/// transform it. The `CommitFilter.actor_id: Option` is a +/// separate hash used for fast commit-level filtering. +/// +/// - `tenant_id` — Tenant identifier. Extracted from JWT custom claim +/// (`tenant_id` or `tid`) or derived from the `sub` domain. +/// +/// - `roles` — Roles the actor holds. Used by the RLS rewriter to +/// determine which predicates to inject. An actor with the `"admin"` +/// role bypasses the per-actor filter (still gets tenant isolation). +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ActorContext { + /// JWT `sub` claim, unchanged. Canonical actor identity. + pub actor_id: String, + /// Tenant identifier. Extracted from JWT custom claim or derived. + pub tenant_id: TenantId, + /// Roles the actor holds. Used by the RLS rewriter to determine + /// which predicates to inject. + pub roles: Vec, +} + +impl ActorContext { + /// Create a new `ActorContext`. + pub fn new(actor_id: String, tenant_id: TenantId, roles: Vec) -> Self { + Self { actor_id, tenant_id, roles } + } + + /// Returns `true` if the actor holds the `"admin"` role. + /// + /// Admin actors bypass the per-actor RLS predicate but still get + /// tenant-scoped isolation. + pub fn is_admin(&self) -> bool { + self.roles.iter().any(|r| r == "admin") + } +} + +/// Errors that can occur during JWT extraction / validation. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum AuthError { + /// Token is not in the expected `header.payload.signature` format. + MalformedToken, + /// Base64 decoding of the payload section failed. + InvalidBase64, + /// Payload JSON is not valid or is missing required fields. + InvalidPayload(String), + /// The `sub` claim is missing from the JWT payload. + MissingSub, + /// Signature verification failed (Phase 2 — not yet implemented). + InvalidSignature, +} + +impl core::fmt::Display for AuthError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + Self::MalformedToken => write!(f, "JWT is not in header.payload.signature format"), + Self::InvalidBase64 => write!(f, "base64 decoding of JWT payload failed"), + Self::InvalidPayload(msg) => write!(f, "invalid JWT payload: {msg}"), + Self::MissingSub => write!(f, "JWT payload missing required 'sub' claim"), + Self::InvalidSignature => write!(f, "JWT signature verification failed"), + } + } +} + +// ── Tests ───────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn actor_context_new() { + let ctx = ActorContext::new( + "user@example.com".into(), + 42, + vec!["viewer".into()], + ); + assert_eq!(ctx.actor_id, "user@example.com"); + assert_eq!(ctx.tenant_id, 42); + assert_eq!(ctx.roles, vec!["viewer"]); + } + + #[test] + fn is_admin_true() { + let ctx = ActorContext::new("a".into(), 1, vec!["admin".into()]); + assert!(ctx.is_admin()); + } + + #[test] + fn is_admin_false_empty_roles() { + let ctx = ActorContext::new("a".into(), 1, vec![]); + assert!(!ctx.is_admin()); + } + + #[test] + fn is_admin_false_other_roles() { + let ctx = ActorContext::new("a".into(), 1, vec!["viewer".into(), "editor".into()]); + assert!(!ctx.is_admin()); + } + + #[test] + fn is_admin_among_many_roles() { + let ctx = ActorContext::new("a".into(), 1, vec!["viewer".into(), "admin".into()]); + assert!(ctx.is_admin()); + } + + #[test] + fn auth_error_display() { + assert_eq!( + AuthError::MalformedToken.to_string(), + "JWT is not in header.payload.signature format" + ); + assert_eq!( + AuthError::MissingSub.to_string(), + "JWT payload missing required 'sub' claim" + ); + assert_eq!( + AuthError::InvalidPayload("bad json".into()).to_string(), + "invalid JWT payload: bad json" + ); + } +} diff --git a/crates/lance-graph-contract/src/lib.rs b/crates/lance-graph-contract/src/lib.rs index 04fe044e..345ef9bc 100644 --- a/crates/lance-graph-contract/src/lib.rs +++ b/crates/lance-graph-contract/src/lib.rs @@ -64,3 +64,4 @@ pub mod reasoning; pub mod property; pub mod ontology; pub mod sla; +pub mod auth; From 7aa8bd63e1f94561524474e121efe2b15c69bd44 Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 25 Apr 2026 13:20:41 +0000 Subject: [PATCH 3/3] chore: clean up unused imports in vsa_udfs.rs https://claude.ai/code/session_01SbYsmmbPf9YQuYbHZN52Zh --- crates/lance-graph-callcenter/src/vsa_udfs.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/lance-graph-callcenter/src/vsa_udfs.rs b/crates/lance-graph-callcenter/src/vsa_udfs.rs index 6797ffbe..e70a706a 100644 --- a/crates/lance-graph-callcenter/src/vsa_udfs.rs +++ b/crates/lance-graph-callcenter/src/vsa_udfs.rs @@ -27,8 +27,9 @@ use std::sync::Arc; use arrow::array::{ Array, ArrayRef, FixedSizeBinaryArray, FixedSizeBinaryBuilder, Float32Array, - ListArray, ListBuilder, UInt16Array, UInt16Builder, UInt32Array, + ListArray, UInt16Array, UInt32Array, }; +use arrow::buffer::{Buffer, OffsetBuffer}; use arrow::datatypes::{DataType, Field}; use datafusion::error::{DataFusionError, Result as DfResult}; use datafusion::execution::context::SessionContext;