-
Notifications
You must be signed in to change notification settings - Fork 0
proposal: streaming evaluation #15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,140 @@ | ||
| # Streaming evaluation | ||
|
|
||
| Status: Proposed (draft PR, design doc only). | ||
| Tracking: ROADMAP.md → Longer term. | ||
|
|
||
| ## Motivation | ||
|
|
||
| The body-aware policy proposal (`body-aware-policies.md`) buffers the | ||
| full request body up to `max_body_bytes` before evaluating. That's | ||
| fine for small JSON payloads. It falls down for: | ||
|
|
||
| - Large file uploads. Buffering 50 MB to test "is this user allowed | ||
| to POST to this path" is wasteful when the decision doesn't need | ||
| the body at all. | ||
| - gRPC and protobuf streams where the policy may only need the first | ||
| few framed messages. | ||
| - Latency-sensitive paths where blocking on the full body adds tens | ||
| of milliseconds before the decision is even possible. | ||
|
|
||
| For policies that don't reference `input.body`, buffering should not | ||
| happen at all. For policies that reference only a prefix | ||
| (`input.body.action`, `input.body.user_id`), buffering should stop | ||
| as soon as the prefix is decidable. Streaming evaluation makes those | ||
| optimizations possible. | ||
|
|
||
| ## Goals | ||
|
|
||
| 1. Static AST analysis at configure time to classify each policy as | ||
| one of: | ||
|
|
||
| | Class | Body buffering required | | ||
| | ---------------------------- | ----------------------- | | ||
| | No body refs | None (decide on headers)| | ||
| | Body refs, prefix-only | Until prefix resolved | | ||
| | Body refs, full-tree | Up to `max_body_bytes` | | ||
|
|
||
| 1. For prefix-only policies: a streaming JSON parser path that lets | ||
| `evaluate` decide partway through the body when the referenced | ||
| prefix is fully resolved. The rest of the body is allowed to flow | ||
| through unbuffered. | ||
| 1. For no-body-refs policies: skip `proxy_on_request_body` entirely. | ||
| The decision is locked in at headers time. | ||
| 1. The streaming path is opt-in. Policies that want strict, full-body | ||
| eval continue to use the buffered path; the analysis classifies | ||
| conservatively (when in doubt, full-tree). | ||
|
|
||
| ## Non-goals | ||
|
|
||
| - Streaming response bodies. Same idea, but tracked separately under | ||
| `response-side-policies.md` extensions. | ||
| - Decisions on partial header sets. Headers are atomic in proxy-wasm; | ||
| no streaming there. | ||
| - Refusing the body mid-stream after data has already been forwarded. | ||
| zopa decides allow/deny *before* the body forwards (Envoy buffers | ||
| internally up to its own configured limits). | ||
|
|
||
| ## Design sketch | ||
|
|
||
| ### AST classifier | ||
|
|
||
| A static walk over the configured policy AST that records every | ||
| `input.body...` ref encountered: | ||
|
|
||
| ```zig | ||
| const BodyDeps = struct { | ||
| refs_body: bool = false, | ||
| refs_paths: std.ArrayList([]const []const u8), // prefix tree | ||
| refs_whole: bool = false, // body referenced as a unit | ||
| }; | ||
|
|
||
| fn classifyBodyDeps(module: *const ast.Module) BodyDeps; | ||
| ``` | ||
|
|
||
| `refs_whole = true` happens when the policy uses `input.body` directly | ||
| (not just a sub-path) or when iteration would require the full | ||
| parsed object. In that case, fall back to the buffered path. | ||
|
|
||
| ### Streaming JSON parser | ||
|
|
||
| `src/json.zig` gains an iterative path that emits `(path, value)` | ||
| events as the body streams in: | ||
|
|
||
| ```zig | ||
| pub const StreamEvent = union(enum) { | ||
| enter_object: []const u8, // path so far | ||
| leave_object: void, | ||
| field: struct { path: [][]const u8, value: Value }, | ||
| done: void, | ||
| }; | ||
|
Comment on lines
+84
to
+89
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
| ``` | ||
|
|
||
| The streaming evaluator subscribes to events and binds resolved refs | ||
| into the `input` lazily. As soon as every ref in the policy's prefix | ||
| set is resolved, evaluation can run. | ||
|
|
||
| ### Decision short-circuit | ||
|
|
||
| Once the streaming evaluator reaches a resolved decision, it tells | ||
| the host: | ||
|
|
||
| - allow → return `Continue` from `proxy_on_request_body` for the | ||
| current chunk and stop subscribing to body events. | ||
| - deny → call `proxy_send_local_response(403)` and return Pause. | ||
|
|
||
| If the body finishes before the prefix set is fully resolved (the | ||
| caller didn't include the expected field), treat the missing path as | ||
| undefined (deny-by-default per Rego semantics). | ||
|
|
||
| ## API impact | ||
|
|
||
| - New plugin config field `streaming: { enabled: bool, max_buffer: | ||
| size }`. Default `enabled: true` once the implementation has at | ||
| least one full release of stability. | ||
| - Per-context state grows to hold the partial input being assembled. | ||
| - AST schema unchanged. | ||
|
|
||
| ## Test plan | ||
|
|
||
| - Unit tests for `classifyBodyDeps` covering each class of policy. | ||
| - Streaming parser unit tests: feed a JSON byte-by-byte, assert | ||
| events fire in order. | ||
| - Integration test: a policy that references only `input.body.action` | ||
| decides before the full payload is delivered. Measure that | ||
| evaluation latency is independent of payload size. | ||
| - Negative test: a policy that needs `input.body.items[*].sku` falls | ||
| back to the buffered path even with `streaming.enabled: true`. | ||
|
|
||
| ## Open questions | ||
|
|
||
| - The streaming parser nearly doubles the size of `src/json.zig`. | ||
| Worth running a sizing experiment before committing: does | ||
| `--release=small` keep zopa.wasm under 80 KB with both paths | ||
| present? | ||
|
Comment on lines
+130
to
+133
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| - How aggressive is the prefix analysis? A conservative pass classifies | ||
| more policies as "full-tree" and forfeits the optimization. A | ||
| precise pass requires reasoning about `some` / `every` over body | ||
| arrays, which is non-trivial. | ||
| - Should the streaming path also feed Envoy's body forwarding so | ||
| large uploads don't pause? Probably yes, but the proxy-wasm body | ||
| ABI semantics need a careful read first. | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,184 @@ | ||
| //! Static analysis of body dependencies in a compiled policy. | ||
| //! | ||
| //! Walks the AST once, classifying how the policy references the | ||
| //! request body. The proxy-wasm shim uses the result to decide | ||
| //! whether to skip `proxy_on_request_body` entirely (no body refs), | ||
| //! buffer until specific paths resolve (prefix-only), or buffer the | ||
| //! whole body up to `max_body_bytes` (full-tree). | ||
| //! | ||
| //! Streaming evaluation itself (per `docs/proposals/streaming-evaluation.md`) | ||
| //! depends on the body-aware callback path landing first; this | ||
| //! analyser is the configure-time piece that ships independently. | ||
|
|
||
| const std = @import("std"); | ||
| const ast = @import("ast.zig"); | ||
|
|
||
| pub const Class = enum { | ||
| /// Policy does not reference `input.body` anywhere. | ||
| no_body_refs, | ||
| /// Policy references body sub-paths (e.g. `input.body.amount`). | ||
| /// A streaming evaluator can decide as soon as those resolve. | ||
| prefix_only, | ||
| /// Policy references `input.body` as a whole or iterates over | ||
| /// the body's contents. The full body must be buffered. | ||
| full_tree, | ||
| }; | ||
|
|
||
| pub const BodyDeps = struct { | ||
| class: Class, | ||
| /// Number of distinct body sub-paths referenced when | ||
| /// `class == .prefix_only`. Always 0 for the other classes. | ||
| prefix_count: usize, | ||
| }; | ||
|
|
||
| /// Classify body usage of every rule reachable in `module`. | ||
| /// Conservative: when in doubt, returns `.full_tree`. | ||
| pub fn analyze(module: ast.Module) BodyDeps { | ||
| var st = State{}; | ||
| for (module.rules) |rule| { | ||
| for (rule.body) |expr| visit(&st, expr); | ||
| if (rule.value) |v| visit(&st, v); | ||
| } | ||
| return st.finalize(); | ||
| } | ||
|
|
||
| const State = struct { | ||
| refs_whole: bool = false, | ||
| prefix_count: usize = 0, | ||
|
|
||
| fn finalize(self: State) BodyDeps { | ||
| if (self.refs_whole) return .{ .class = .full_tree, .prefix_count = 0 }; | ||
| if (self.prefix_count == 0) return .{ .class = .no_body_refs, .prefix_count = 0 }; | ||
| return .{ .class = .prefix_only, .prefix_count = self.prefix_count }; | ||
| } | ||
| }; | ||
|
|
||
| fn visit(st: *State, expr: *const ast.Expr) void { | ||
| switch (expr.*) { | ||
| .value => {}, | ||
| .ref => |path| visitRef(st, path), | ||
| .compare => |c| { | ||
| visit(st, c.left); | ||
| visit(st, c.right); | ||
| }, | ||
| .not => |inner| visit(st, inner), | ||
| .some, .every => |it| { | ||
| visit(st, it.source); | ||
| visit(st, it.body); | ||
| // Iterating over a ref into the body is full-tree: | ||
| // the iterator needs the entire collection. The source | ||
| // visit above marks the path; we promote to whole if | ||
| // the source is itself an `input.body...` ref. | ||
| if (it.source.* == .ref) { | ||
| if (refTouchesBody(it.source.ref)) st.refs_whole = true; | ||
| } | ||
| }, | ||
| .call => |c| for (c.args) |arg| visit(st, arg), | ||
| } | ||
| } | ||
|
|
||
| fn visitRef(st: *State, path: []const []const u8) void { | ||
| if (!refTouchesBody(path)) return; | ||
|
|
||
| // `input.body` (or just `body`) by itself = whole-tree dependency. | ||
| // Body sub-paths (input.body.amount, body.user) = prefix-only. | ||
| const body_index = bodySegmentIndex(path) orelse return; | ||
| if (body_index + 1 >= path.len) { | ||
| st.refs_whole = true; | ||
| } else { | ||
| st.prefix_count += 1; | ||
| } | ||
| } | ||
|
|
||
| fn refTouchesBody(path: []const []const u8) bool { | ||
| return bodySegmentIndex(path) != null; | ||
| } | ||
|
|
||
| /// Locate the `body` segment inside an input ref. Accepts both | ||
| /// `["input", "body", ...]` and the shorthand `["body", ...]`. | ||
| fn bodySegmentIndex(path: []const []const u8) ?usize { | ||
| if (path.len == 0) return null; | ||
| if (std.mem.eql(u8, path[0], "body")) return 0; | ||
| if (path.len >= 2 and std.mem.eql(u8, path[0], "input") and std.mem.eql(u8, path[1], "body")) { | ||
| return 1; | ||
| } | ||
| return null; | ||
| } | ||
|
|
||
| const testing = std.testing; | ||
| const json = @import("json.zig"); | ||
|
|
||
| fn classify(src: []const u8) !Class { | ||
| var arena = std.heap.ArenaAllocator.init(testing.allocator); | ||
| defer arena.deinit(); | ||
| const node = try json.parse(arena.allocator(), src); | ||
| const module = try ast.buildModule(arena.allocator(), node); | ||
| return analyze(module).class; | ||
| } | ||
|
|
||
| test "analyze: no body refs -> no_body_refs" { | ||
| const policy = | ||
| "{\"type\":\"eq\"," ++ | ||
| "\"left\":{\"type\":\"ref\",\"path\":[\"input\",\"method\"]}," ++ | ||
| "\"right\":{\"type\":\"value\",\"value\":\"GET\"}}"; | ||
| try testing.expectEqual(Class.no_body_refs, try classify(policy)); | ||
| } | ||
|
|
||
| test "analyze: input.body.amount -> prefix_only" { | ||
| const policy = | ||
| "{\"type\":\"gt\"," ++ | ||
| "\"left\":{\"type\":\"ref\",\"path\":[\"input\",\"body\",\"amount\"]}," ++ | ||
| "\"right\":{\"type\":\"value\",\"value\":100}}"; | ||
| try testing.expectEqual(Class.prefix_only, try classify(policy)); | ||
| } | ||
|
|
||
| test "analyze: bare input.body -> full_tree" { | ||
| const policy = | ||
| "{\"type\":\"neq\"," ++ | ||
| "\"left\":{\"type\":\"ref\",\"path\":[\"input\",\"body\"]}," ++ | ||
| "\"right\":{\"type\":\"value\",\"value\":null}}"; | ||
| try testing.expectEqual(Class.full_tree, try classify(policy)); | ||
| } | ||
|
|
||
| test "analyze: iterate input.body.items -> full_tree" { | ||
| const policy = | ||
| "{\"type\":\"every\",\"var\":\"item\"," ++ | ||
| "\"source\":{\"type\":\"ref\",\"path\":[\"input\",\"body\",\"items\"]}," ++ | ||
| "\"body\":{\"type\":\"value\",\"value\":true}}"; | ||
| try testing.expectEqual(Class.full_tree, try classify(policy)); | ||
| } | ||
|
|
||
| test "analyze: prefix_count counts distinct body refs" { | ||
| const policy = | ||
| "{\"type\":\"module\",\"rules\":[" ++ | ||
| "{\"type\":\"rule\",\"name\":\"allow\",\"body\":[" ++ | ||
| "{\"type\":\"eq\"," ++ | ||
| "\"left\":{\"type\":\"ref\",\"path\":[\"input\",\"body\",\"action\"]}," ++ | ||
| "\"right\":{\"type\":\"value\",\"value\":\"submit\"}}," ++ | ||
| "{\"type\":\"gt\"," ++ | ||
| "\"left\":{\"type\":\"ref\",\"path\":[\"input\",\"body\",\"amount\"]}," ++ | ||
| "\"right\":{\"type\":\"value\",\"value\":0}}]}]}"; | ||
| var arena = std.heap.ArenaAllocator.init(testing.allocator); | ||
| defer arena.deinit(); | ||
| const node = try json.parse(arena.allocator(), policy); | ||
| const module = try ast.buildModule(arena.allocator(), node); | ||
| const deps = analyze(module); | ||
| try testing.expectEqual(Class.prefix_only, deps.class); | ||
| try testing.expectEqual(@as(usize, 2), deps.prefix_count); | ||
| } | ||
|
|
||
| test "analyze: body shorthand path (no input prefix) detected" { | ||
| const policy = | ||
| "{\"type\":\"eq\"," ++ | ||
| "\"left\":{\"type\":\"ref\",\"path\":[\"body\",\"x\"]}," ++ | ||
| "\"right\":{\"type\":\"value\",\"value\":1}}"; | ||
| try testing.expectEqual(Class.prefix_only, try classify(policy)); | ||
| } | ||
|
|
||
| test "analyze: call with body arg -> prefix_only" { | ||
| const policy = | ||
| "{\"type\":\"call\",\"name\":\"startswith\",\"args\":[" ++ | ||
| "{\"type\":\"ref\",\"path\":[\"input\",\"body\",\"action\"]}," ++ | ||
| "{\"type\":\"value\",\"value\":\"approve_\"}]}"; | ||
| try testing.expectEqual(Class.prefix_only, try classify(policy)); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The$O(depth)$ lookups instead of $O(N)$ comparisons against all registered paths.
refs_pathsfield is described as a "prefix tree" but typed as a flatArrayListof paths. For efficient matching during streaming, especially if a policy has many references, a true trie or prefix tree structure would be more appropriate to allow