diff --git a/docs/proposals/streaming-evaluation.md b/docs/proposals/streaming-evaluation.md new file mode 100644 index 0000000..9005915 --- /dev/null +++ b/docs/proposals/streaming-evaluation.md @@ -0,0 +1,140 @@ +# Streaming evaluation + +Status: Proposed (draft PR, design doc only). +Tracking: ROADMAP.md → Longer term. + +## Motivation + +The body-aware policy proposal (`body-aware-policies.md`) buffers the +full request body up to `max_body_bytes` before evaluating. That's +fine for small JSON payloads. It falls down for: + +- Large file uploads. Buffering 50 MB to test "is this user allowed + to POST to this path" is wasteful when the decision doesn't need + the body at all. +- gRPC and protobuf streams where the policy may only need the first + few framed messages. +- Latency-sensitive paths where blocking on the full body adds tens + of milliseconds before the decision is even possible. + +For policies that don't reference `input.body`, buffering should not +happen at all. For policies that reference only a prefix +(`input.body.action`, `input.body.user_id`), buffering should stop +as soon as the prefix is decidable. Streaming evaluation makes those +optimizations possible. + +## Goals + +1. Static AST analysis at configure time to classify each policy as + one of: + + | Class | Body buffering required | + | ---------------------------- | ----------------------- | + | No body refs | None (decide on headers)| + | Body refs, prefix-only | Until prefix resolved | + | Body refs, full-tree | Up to `max_body_bytes` | + +1. For prefix-only policies: a streaming JSON parser path that lets + `evaluate` decide partway through the body when the referenced + prefix is fully resolved. The rest of the body is allowed to flow + through unbuffered. +1. For no-body-refs policies: skip `proxy_on_request_body` entirely. + The decision is locked in at headers time. +1. The streaming path is opt-in. Policies that want strict, full-body + eval continue to use the buffered path; the analysis classifies + conservatively (when in doubt, full-tree). + +## Non-goals + +- Streaming response bodies. Same idea, but tracked separately under + `response-side-policies.md` extensions. +- Decisions on partial header sets. Headers are atomic in proxy-wasm; + no streaming there. +- Refusing the body mid-stream after data has already been forwarded. + zopa decides allow/deny *before* the body forwards (Envoy buffers + internally up to its own configured limits). + +## Design sketch + +### AST classifier + +A static walk over the configured policy AST that records every +`input.body...` ref encountered: + +```zig +const BodyDeps = struct { + refs_body: bool = false, + refs_paths: std.ArrayList([]const []const u8), // prefix tree + refs_whole: bool = false, // body referenced as a unit +}; + +fn classifyBodyDeps(module: *const ast.Module) BodyDeps; +``` + +`refs_whole = true` happens when the policy uses `input.body` directly +(not just a sub-path) or when iteration would require the full +parsed object. In that case, fall back to the buffered path. + +### Streaming JSON parser + +`src/json.zig` gains an iterative path that emits `(path, value)` +events as the body streams in: + +```zig +pub const StreamEvent = union(enum) { + enter_object: []const u8, // path so far + leave_object: void, + field: struct { path: [][]const u8, value: Value }, + done: void, +}; +``` + +The streaming evaluator subscribes to events and binds resolved refs +into the `input` lazily. As soon as every ref in the policy's prefix +set is resolved, evaluation can run. + +### Decision short-circuit + +Once the streaming evaluator reaches a resolved decision, it tells +the host: + +- allow → return `Continue` from `proxy_on_request_body` for the + current chunk and stop subscribing to body events. +- deny → call `proxy_send_local_response(403)` and return Pause. + +If the body finishes before the prefix set is fully resolved (the +caller didn't include the expected field), treat the missing path as +undefined (deny-by-default per Rego semantics). + +## API impact + +- New plugin config field `streaming: { enabled: bool, max_buffer: + size }`. Default `enabled: true` once the implementation has at + least one full release of stability. +- Per-context state grows to hold the partial input being assembled. +- AST schema unchanged. + +## Test plan + +- Unit tests for `classifyBodyDeps` covering each class of policy. +- Streaming parser unit tests: feed a JSON byte-by-byte, assert + events fire in order. +- Integration test: a policy that references only `input.body.action` + decides before the full payload is delivered. Measure that + evaluation latency is independent of payload size. +- Negative test: a policy that needs `input.body.items[*].sku` falls + back to the buffered path even with `streaming.enabled: true`. + +## Open questions + +- The streaming parser nearly doubles the size of `src/json.zig`. + Worth running a sizing experiment before committing: does + `--release=small` keep zopa.wasm under 80 KB with both paths + present? +- How aggressive is the prefix analysis? A conservative pass classifies + more policies as "full-tree" and forfeits the optimization. A + precise pass requires reasoning about `some` / `every` over body + arrays, which is non-trivial. +- Should the streaming path also feed Envoy's body forwarding so + large uploads don't pause? Probably yes, but the proxy-wasm body + ABI semantics need a careful read first. diff --git a/src/body_deps.zig b/src/body_deps.zig new file mode 100644 index 0000000..652230b --- /dev/null +++ b/src/body_deps.zig @@ -0,0 +1,184 @@ +//! Static analysis of body dependencies in a compiled policy. +//! +//! Walks the AST once, classifying how the policy references the +//! request body. The proxy-wasm shim uses the result to decide +//! whether to skip `proxy_on_request_body` entirely (no body refs), +//! buffer until specific paths resolve (prefix-only), or buffer the +//! whole body up to `max_body_bytes` (full-tree). +//! +//! Streaming evaluation itself (per `docs/proposals/streaming-evaluation.md`) +//! depends on the body-aware callback path landing first; this +//! analyser is the configure-time piece that ships independently. + +const std = @import("std"); +const ast = @import("ast.zig"); + +pub const Class = enum { + /// Policy does not reference `input.body` anywhere. + no_body_refs, + /// Policy references body sub-paths (e.g. `input.body.amount`). + /// A streaming evaluator can decide as soon as those resolve. + prefix_only, + /// Policy references `input.body` as a whole or iterates over + /// the body's contents. The full body must be buffered. + full_tree, +}; + +pub const BodyDeps = struct { + class: Class, + /// Number of distinct body sub-paths referenced when + /// `class == .prefix_only`. Always 0 for the other classes. + prefix_count: usize, +}; + +/// Classify body usage of every rule reachable in `module`. +/// Conservative: when in doubt, returns `.full_tree`. +pub fn analyze(module: ast.Module) BodyDeps { + var st = State{}; + for (module.rules) |rule| { + for (rule.body) |expr| visit(&st, expr); + if (rule.value) |v| visit(&st, v); + } + return st.finalize(); +} + +const State = struct { + refs_whole: bool = false, + prefix_count: usize = 0, + + fn finalize(self: State) BodyDeps { + if (self.refs_whole) return .{ .class = .full_tree, .prefix_count = 0 }; + if (self.prefix_count == 0) return .{ .class = .no_body_refs, .prefix_count = 0 }; + return .{ .class = .prefix_only, .prefix_count = self.prefix_count }; + } +}; + +fn visit(st: *State, expr: *const ast.Expr) void { + switch (expr.*) { + .value => {}, + .ref => |path| visitRef(st, path), + .compare => |c| { + visit(st, c.left); + visit(st, c.right); + }, + .not => |inner| visit(st, inner), + .some, .every => |it| { + visit(st, it.source); + visit(st, it.body); + // Iterating over a ref into the body is full-tree: + // the iterator needs the entire collection. The source + // visit above marks the path; we promote to whole if + // the source is itself an `input.body...` ref. + if (it.source.* == .ref) { + if (refTouchesBody(it.source.ref)) st.refs_whole = true; + } + }, + .call => |c| for (c.args) |arg| visit(st, arg), + } +} + +fn visitRef(st: *State, path: []const []const u8) void { + if (!refTouchesBody(path)) return; + + // `input.body` (or just `body`) by itself = whole-tree dependency. + // Body sub-paths (input.body.amount, body.user) = prefix-only. + const body_index = bodySegmentIndex(path) orelse return; + if (body_index + 1 >= path.len) { + st.refs_whole = true; + } else { + st.prefix_count += 1; + } +} + +fn refTouchesBody(path: []const []const u8) bool { + return bodySegmentIndex(path) != null; +} + +/// Locate the `body` segment inside an input ref. Accepts both +/// `["input", "body", ...]` and the shorthand `["body", ...]`. +fn bodySegmentIndex(path: []const []const u8) ?usize { + if (path.len == 0) return null; + if (std.mem.eql(u8, path[0], "body")) return 0; + if (path.len >= 2 and std.mem.eql(u8, path[0], "input") and std.mem.eql(u8, path[1], "body")) { + return 1; + } + return null; +} + +const testing = std.testing; +const json = @import("json.zig"); + +fn classify(src: []const u8) !Class { + var arena = std.heap.ArenaAllocator.init(testing.allocator); + defer arena.deinit(); + const node = try json.parse(arena.allocator(), src); + const module = try ast.buildModule(arena.allocator(), node); + return analyze(module).class; +} + +test "analyze: no body refs -> no_body_refs" { + const policy = + "{\"type\":\"eq\"," ++ + "\"left\":{\"type\":\"ref\",\"path\":[\"input\",\"method\"]}," ++ + "\"right\":{\"type\":\"value\",\"value\":\"GET\"}}"; + try testing.expectEqual(Class.no_body_refs, try classify(policy)); +} + +test "analyze: input.body.amount -> prefix_only" { + const policy = + "{\"type\":\"gt\"," ++ + "\"left\":{\"type\":\"ref\",\"path\":[\"input\",\"body\",\"amount\"]}," ++ + "\"right\":{\"type\":\"value\",\"value\":100}}"; + try testing.expectEqual(Class.prefix_only, try classify(policy)); +} + +test "analyze: bare input.body -> full_tree" { + const policy = + "{\"type\":\"neq\"," ++ + "\"left\":{\"type\":\"ref\",\"path\":[\"input\",\"body\"]}," ++ + "\"right\":{\"type\":\"value\",\"value\":null}}"; + try testing.expectEqual(Class.full_tree, try classify(policy)); +} + +test "analyze: iterate input.body.items -> full_tree" { + const policy = + "{\"type\":\"every\",\"var\":\"item\"," ++ + "\"source\":{\"type\":\"ref\",\"path\":[\"input\",\"body\",\"items\"]}," ++ + "\"body\":{\"type\":\"value\",\"value\":true}}"; + try testing.expectEqual(Class.full_tree, try classify(policy)); +} + +test "analyze: prefix_count counts distinct body refs" { + const policy = + "{\"type\":\"module\",\"rules\":[" ++ + "{\"type\":\"rule\",\"name\":\"allow\",\"body\":[" ++ + "{\"type\":\"eq\"," ++ + "\"left\":{\"type\":\"ref\",\"path\":[\"input\",\"body\",\"action\"]}," ++ + "\"right\":{\"type\":\"value\",\"value\":\"submit\"}}," ++ + "{\"type\":\"gt\"," ++ + "\"left\":{\"type\":\"ref\",\"path\":[\"input\",\"body\",\"amount\"]}," ++ + "\"right\":{\"type\":\"value\",\"value\":0}}]}]}"; + var arena = std.heap.ArenaAllocator.init(testing.allocator); + defer arena.deinit(); + const node = try json.parse(arena.allocator(), policy); + const module = try ast.buildModule(arena.allocator(), node); + const deps = analyze(module); + try testing.expectEqual(Class.prefix_only, deps.class); + try testing.expectEqual(@as(usize, 2), deps.prefix_count); +} + +test "analyze: body shorthand path (no input prefix) detected" { + const policy = + "{\"type\":\"eq\"," ++ + "\"left\":{\"type\":\"ref\",\"path\":[\"body\",\"x\"]}," ++ + "\"right\":{\"type\":\"value\",\"value\":1}}"; + try testing.expectEqual(Class.prefix_only, try classify(policy)); +} + +test "analyze: call with body arg -> prefix_only" { + const policy = + "{\"type\":\"call\",\"name\":\"startswith\",\"args\":[" ++ + "{\"type\":\"ref\",\"path\":[\"input\",\"body\",\"action\"]}," ++ + "{\"type\":\"value\",\"value\":\"approve_\"}]}"; + try testing.expectEqual(Class.prefix_only, try classify(policy)); +} diff --git a/src/root.zig b/src/root.zig index 95c0a06..4efb9d8 100644 --- a/src/root.zig +++ b/src/root.zig @@ -7,6 +7,7 @@ //! `main.zig`. pub const ast = @import("ast.zig"); +pub const body_deps = @import("body_deps.zig"); pub const builtins = @import("builtins.zig"); pub const eval = @import("eval.zig"); pub const json = @import("json.zig"); @@ -24,6 +25,7 @@ test { const testing = @import("std").testing; testing.refAllDecls(@This()); _ = ast; + _ = body_deps; _ = builtins; _ = eval; _ = json;