Skip to content

Commit 99f67a0

Browse files
committed
A-unlock-policy: generalized PolicyRewriter trait + ColumnMaskRewriter (epiphany E1)
1 parent 6133eb2 commit 99f67a0

1 file changed

Lines changed: 309 additions & 0 deletions

File tree

Lines changed: 309 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,309 @@
1+
//! Policy-layer rewriting framework.
2+
//!
3+
//! Generalization of `crate::rls::RlsRewriter`. The same DataFusion
4+
//! `OptimizerRule` machinery used for tenant-predicate injection serves
5+
//! column masking, row-level encryption, and differential-privacy noise
6+
//! injection. Each policy type is a `PolicyRewriter` impl; they compose
7+
//! via the optimizer rule chain.
8+
//!
9+
//! See PR #278 outlook epiphany E1 for the full motivation.
10+
//!
11+
//! META-AGENT: add `pub mod policy;` to lib.rs gated by `feature = "policy"`.
12+
//! Default to including `policy` feature in `auth-rls-lite` (it's purely
13+
//! additive). Suggested Cargo.toml entry:
14+
//!
15+
//! ```toml
16+
//! policy = ["auth-rls-lite"]
17+
//! ```
18+
19+
use std::sync::Arc;
20+
21+
#[cfg(feature = "auth-rls-lite")]
22+
use datafusion::common::tree_node::Transformed;
23+
#[cfg(feature = "auth-rls-lite")]
24+
use datafusion::common::Result as DFResult;
25+
#[cfg(feature = "auth-rls-lite")]
26+
use datafusion::logical_expr::LogicalPlan;
27+
#[cfg(feature = "auth-rls-lite")]
28+
use datafusion::optimizer::{ApplyOrder, OptimizerConfig, OptimizerRule};
29+
30+
// ── Policy taxonomy ──────────────────────────────────────────────────────────
31+
32+
/// Policy classification — what kind of transform a rewriter implements.
33+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
34+
pub enum PolicyKind {
35+
/// Inject row-filter predicates (e.g. `tenant_id = 't1'`).
36+
RowFilter,
37+
/// Mask / redact / hash columns based on actor role.
38+
ColumnMask,
39+
/// Encrypt selected columns at rest using a key handle.
40+
RowEncryption,
41+
/// Inject differential-privacy noise into aggregate outputs.
42+
DifferentialPrivacy,
43+
/// Emit audit events (read-only side channel).
44+
Audit,
45+
}
46+
47+
/// Generalized policy rewriter. Implementors transform a LogicalPlan and
48+
/// declare their kind for ordering / introspection.
49+
#[cfg(feature = "auth-rls-lite")]
50+
pub trait PolicyRewriter: Send + Sync + std::fmt::Debug {
51+
fn kind(&self) -> PolicyKind;
52+
/// Stable name (e.g. "rls_rewriter", "column_mask"). Used by audit log.
53+
fn name(&self) -> &'static str;
54+
/// Rewrite predicate. Default = identity (subclasses override what they need).
55+
fn rewrite_plan(&self, plan: LogicalPlan) -> DFResult<Transformed<LogicalPlan>> {
56+
Ok(Transformed::no(plan))
57+
}
58+
}
59+
60+
// ── Column masking policy ────────────────────────────────────────────────────
61+
62+
/// Per-column redaction mode. Drives how `ColumnMaskRewriter` rewrites
63+
/// referenced expressions in `Projection` nodes.
64+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65+
pub enum RedactionMode {
66+
/// Replace with NULL.
67+
Null,
68+
/// Replace with a constant ("[REDACTED]").
69+
Constant,
70+
/// Hash via FNV-64 (stable across builds).
71+
Hash,
72+
/// First-N-chars only (e.g. credit card last-4).
73+
Truncate(usize),
74+
}
75+
76+
/// Column masking policy: redact / hash / mask values from selected columns
77+
/// based on actor role. Stub UDF reference; concrete UDFs land in a follow-up.
78+
#[derive(Debug, Clone)]
79+
pub struct ColumnMaskPolicy {
80+
/// Table whose columns this policy applies to.
81+
pub table_name: String,
82+
/// Per-column redaction mode. Missing column = unmasked.
83+
pub columns: std::collections::HashMap<String, RedactionMode>,
84+
}
85+
86+
#[derive(Debug, Default, Clone)]
87+
pub struct ColumnMaskRegistry {
88+
policies: std::collections::HashMap<String, ColumnMaskPolicy>,
89+
}
90+
91+
impl ColumnMaskRegistry {
92+
pub fn new() -> Self {
93+
Self::default()
94+
}
95+
pub fn register(&mut self, policy: ColumnMaskPolicy) {
96+
self.policies.insert(policy.table_name.clone(), policy);
97+
}
98+
pub fn lookup(&self, table_name: &str) -> Option<&ColumnMaskPolicy> {
99+
self.policies.get(table_name)
100+
}
101+
}
102+
103+
#[cfg(feature = "auth-rls-lite")]
104+
#[derive(Debug)]
105+
pub struct ColumnMaskRewriter {
106+
pub registry: Arc<ColumnMaskRegistry>,
107+
pub actor_role: String,
108+
}
109+
110+
#[cfg(feature = "auth-rls-lite")]
111+
impl PolicyRewriter for ColumnMaskRewriter {
112+
fn kind(&self) -> PolicyKind {
113+
PolicyKind::ColumnMask
114+
}
115+
fn name(&self) -> &'static str {
116+
"column_mask"
117+
}
118+
fn rewrite_plan(&self, plan: LogicalPlan) -> DFResult<Transformed<LogicalPlan>> {
119+
// Walk plan; on Projection, rewrite expressions for redacted columns.
120+
// For this PR ship the structural skeleton; the actual UDF wrap lands
121+
// in a follow-up once redaction UDFs are registered.
122+
// TODO: wrap Expr::Column(c) in mask_udf(...) for c in policy.columns
123+
Ok(Transformed::no(plan))
124+
}
125+
}
126+
127+
#[cfg(feature = "auth-rls-lite")]
128+
impl OptimizerRule for ColumnMaskRewriter {
129+
fn name(&self) -> &str {
130+
"column_mask_rewriter"
131+
}
132+
fn apply_order(&self) -> Option<ApplyOrder> {
133+
Some(ApplyOrder::TopDown)
134+
}
135+
fn supports_rewrite(&self) -> bool {
136+
true
137+
}
138+
fn rewrite(
139+
&self,
140+
plan: LogicalPlan,
141+
_config: &dyn OptimizerConfig,
142+
) -> DFResult<Transformed<LogicalPlan>> {
143+
self.rewrite_plan(plan)
144+
}
145+
}
146+
147+
// ── Row encryption policy (stub, no executor yet) ────────────────────────────
148+
149+
/// Row encryption policy: encrypt selected columns at rest using a key
150+
/// handle. The actual cipher binding lands in a follow-up; this carries
151+
/// the per-column key-handle association so the registry surface is stable.
152+
#[derive(Debug, Clone)]
153+
pub struct RowEncryptionPolicy {
154+
/// Table whose columns this policy applies to.
155+
pub table_name: String,
156+
/// Per-column key handle. Missing column = unencrypted.
157+
pub columns: std::collections::HashMap<String, KeyHandle>,
158+
}
159+
160+
/// Opaque key handle. Resolves through a downstream KMS / keyring service.
161+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
162+
pub struct KeyHandle(pub String);
163+
164+
#[derive(Debug, Default, Clone)]
165+
pub struct RowEncryptionRegistry {
166+
policies: std::collections::HashMap<String, RowEncryptionPolicy>,
167+
}
168+
169+
impl RowEncryptionRegistry {
170+
pub fn new() -> Self {
171+
Self::default()
172+
}
173+
pub fn register(&mut self, policy: RowEncryptionPolicy) {
174+
self.policies.insert(policy.table_name.clone(), policy);
175+
}
176+
pub fn lookup(&self, table_name: &str) -> Option<&RowEncryptionPolicy> {
177+
self.policies.get(table_name)
178+
}
179+
}
180+
181+
// ── Differential-privacy policy (stub, no executor yet) ──────────────────────
182+
183+
/// DP noise mechanism. Drives the noise distribution used when the
184+
/// rewriter wraps aggregate outputs.
185+
#[derive(Debug, Clone, Copy, PartialEq)]
186+
pub enum DpMechanism {
187+
/// Laplace mechanism — calibrated to ε.
188+
Laplace,
189+
/// Gaussian mechanism — calibrated to (ε, δ).
190+
Gaussian,
191+
}
192+
193+
/// Differential-privacy policy. Carries the privacy budget and noise
194+
/// mechanism; the rewriter wraps SUM/COUNT/AVG aggregates with calibrated
195+
/// noise injection (follow-up PR).
196+
#[derive(Debug, Clone)]
197+
pub struct DifferentialPrivacyPolicy {
198+
/// Table this policy applies to.
199+
pub table_name: String,
200+
/// Privacy budget. Smaller ε = more noise = stronger privacy.
201+
pub epsilon: f64,
202+
/// Noise mechanism.
203+
pub mechanism: DpMechanism,
204+
}
205+
206+
#[derive(Debug, Default, Clone)]
207+
pub struct DifferentialPrivacyRegistry {
208+
policies: std::collections::HashMap<String, DifferentialPrivacyPolicy>,
209+
}
210+
211+
impl DifferentialPrivacyRegistry {
212+
pub fn new() -> Self {
213+
Self::default()
214+
}
215+
pub fn register(&mut self, policy: DifferentialPrivacyPolicy) {
216+
self.policies.insert(policy.table_name.clone(), policy);
217+
}
218+
pub fn lookup(&self, table_name: &str) -> Option<&DifferentialPrivacyPolicy> {
219+
self.policies.get(table_name)
220+
}
221+
}
222+
223+
// ── Tests ────────────────────────────────────────────────────────────────────
224+
225+
#[cfg(test)]
226+
mod tests {
227+
use super::*;
228+
use std::collections::HashMap;
229+
use std::collections::HashSet;
230+
231+
#[test]
232+
fn column_mask_registry_register_lookup() {
233+
let mut registry = ColumnMaskRegistry::new();
234+
let mut columns = HashMap::new();
235+
columns.insert("ssn".to_string(), RedactionMode::Hash);
236+
columns.insert("card".to_string(), RedactionMode::Truncate(4));
237+
registry.register(ColumnMaskPolicy {
238+
table_name: "customers".to_string(),
239+
columns,
240+
});
241+
242+
let policy = registry.lookup("customers").expect("policy registered");
243+
assert_eq!(policy.table_name, "customers");
244+
assert_eq!(policy.columns.get("ssn"), Some(&RedactionMode::Hash));
245+
assert_eq!(
246+
policy.columns.get("card"),
247+
Some(&RedactionMode::Truncate(4))
248+
);
249+
assert!(registry.lookup("missing").is_none());
250+
}
251+
252+
#[test]
253+
fn redaction_mode_variants_distinct() {
254+
// Each variant is a distinct value; equality is structural.
255+
assert_ne!(RedactionMode::Null, RedactionMode::Constant);
256+
assert_ne!(RedactionMode::Hash, RedactionMode::Null);
257+
assert_ne!(RedactionMode::Truncate(4), RedactionMode::Truncate(8));
258+
assert_eq!(RedactionMode::Truncate(4), RedactionMode::Truncate(4));
259+
}
260+
261+
#[cfg(feature = "auth-rls-lite")]
262+
#[test]
263+
fn column_mask_rewriter_kind_is_column_mask() {
264+
let rewriter = ColumnMaskRewriter {
265+
registry: Arc::new(ColumnMaskRegistry::new()),
266+
actor_role: "analyst".to_string(),
267+
};
268+
assert_eq!(rewriter.kind(), PolicyKind::ColumnMask);
269+
assert_eq!(<ColumnMaskRewriter as PolicyRewriter>::name(&rewriter), "column_mask");
270+
}
271+
272+
#[test]
273+
fn policy_kind_is_hashable_for_dispatch() {
274+
// PolicyKind being Hash + Eq lets a registry dispatch
275+
// rewriters by kind in a HashSet/HashMap. Smoke-test the trait
276+
// bounds by inserting into a HashSet.
277+
let mut set: HashSet<PolicyKind> = HashSet::new();
278+
set.insert(PolicyKind::RowFilter);
279+
set.insert(PolicyKind::ColumnMask);
280+
set.insert(PolicyKind::RowEncryption);
281+
set.insert(PolicyKind::DifferentialPrivacy);
282+
set.insert(PolicyKind::Audit);
283+
// Inserting a duplicate should be a no-op.
284+
assert!(!set.insert(PolicyKind::ColumnMask));
285+
assert_eq!(set.len(), 5);
286+
}
287+
288+
#[cfg(feature = "auth-rls-lite")]
289+
#[test]
290+
fn column_mask_rewriter_passes_through_for_now() {
291+
use datafusion::logical_expr::{EmptyRelation, LogicalPlan};
292+
use std::sync::Arc as StdArc;
293+
294+
let rewriter = ColumnMaskRewriter {
295+
registry: Arc::new(ColumnMaskRegistry::new()),
296+
actor_role: "analyst".to_string(),
297+
};
298+
let plan = LogicalPlan::EmptyRelation(EmptyRelation {
299+
produce_one_row: false,
300+
schema: StdArc::new(datafusion::common::DFSchema::empty()),
301+
});
302+
let transformed = rewriter
303+
.rewrite_plan(plan)
304+
.expect("rewrite should succeed");
305+
// Skeleton implementation — should be a no-op until the UDF wrap
306+
// lands.
307+
assert!(!transformed.transformed);
308+
}
309+
}

0 commit comments

Comments
 (0)