Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 140 additions & 0 deletions docs/proposals/streaming-evaluation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# Streaming evaluation

Status: Proposed (draft PR, design doc only).
Tracking: ROADMAP.md → Longer term.

## Motivation

The body-aware policy proposal (`body-aware-policies.md`) buffers the
full request body up to `max_body_bytes` before evaluating. That's
fine for small JSON payloads. It falls down for:

- Large file uploads. Buffering 50 MB to test "is this user allowed
to POST to this path" is wasteful when the decision doesn't need
the body at all.
- gRPC and protobuf streams where the policy may only need the first
few framed messages.
- Latency-sensitive paths where blocking on the full body adds tens
of milliseconds before the decision is even possible.

For policies that don't reference `input.body`, buffering should not
happen at all. For policies that reference only a prefix
(`input.body.action`, `input.body.user_id`), buffering should stop
as soon as the prefix is decidable. Streaming evaluation makes those
optimizations possible.

## Goals

1. Static AST analysis at configure time to classify each policy as
one of:

| Class | Body buffering required |
| ---------------------------- | ----------------------- |
| No body refs | None (decide on headers)|
| Body refs, prefix-only | Until prefix resolved |
| Body refs, full-tree | Up to `max_body_bytes` |

1. For prefix-only policies: a streaming JSON parser path that lets
`evaluate` decide partway through the body when the referenced
prefix is fully resolved. The rest of the body is allowed to flow
through unbuffered.
1. For no-body-refs policies: skip `proxy_on_request_body` entirely.
The decision is locked in at headers time.
1. The streaming path is opt-in. Policies that want strict, full-body
eval continue to use the buffered path; the analysis classifies
conservatively (when in doubt, full-tree).

## Non-goals

- Streaming response bodies. Same idea, but tracked separately under
`response-side-policies.md` extensions.
- Decisions on partial header sets. Headers are atomic in proxy-wasm;
no streaming there.
- Refusing the body mid-stream after data has already been forwarded.
zopa decides allow/deny *before* the body forwards (Envoy buffers
internally up to its own configured limits).

## Design sketch

### AST classifier

A static walk over the configured policy AST that records every
`input.body...` ref encountered:

```zig
const BodyDeps = struct {
refs_body: bool = false,
refs_paths: std.ArrayList([]const []const u8), // prefix tree

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The refs_paths field is described as a "prefix tree" but typed as a flat ArrayList of paths. For efficient matching during streaming, especially if a policy has many references, a true trie or prefix tree structure would be more appropriate to allow $O(depth)$ lookups instead of $O(N)$ comparisons against all registered paths.

refs_whole: bool = false, // body referenced as a unit
};

fn classifyBodyDeps(module: *const ast.Module) BodyDeps;
```

`refs_whole = true` happens when the policy uses `input.body` directly
(not just a sub-path) or when iteration would require the full
parsed object. In that case, fall back to the buffered path.

### Streaming JSON parser

`src/json.zig` gains an iterative path that emits `(path, value)`
events as the body streams in:

```zig
pub const StreamEvent = union(enum) {
enter_object: []const u8, // path so far
leave_object: void,
field: struct { path: [][]const u8, value: Value },
done: void,
};
Comment on lines +84 to +89

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The StreamEvent design appears to mix hierarchical and flat approaches. It provides enter_object but also includes a full path in each field event, which is redundant and expensive to allocate during streaming. Furthermore, using the recursive Value type for fields implies that nested objects are still fully buffered before being emitted. A more efficient streaming API would emit separate events for container boundaries and only use Value for scalar leaves.

```

The streaming evaluator subscribes to events and binds resolved refs
into the `input` lazily. As soon as every ref in the policy's prefix
set is resolved, evaluation can run.

### Decision short-circuit

Once the streaming evaluator reaches a resolved decision, it tells
the host:

- allow → return `Continue` from `proxy_on_request_body` for the
current chunk and stop subscribing to body events.
- deny → call `proxy_send_local_response(403)` and return Pause.

If the body finishes before the prefix set is fully resolved (the
caller didn't include the expected field), treat the missing path as
undefined (deny-by-default per Rego semantics).

## API impact

- New plugin config field `streaming: { enabled: bool, max_buffer:
size }`. Default `enabled: true` once the implementation has at
least one full release of stability.
- Per-context state grows to hold the partial input being assembled.
- AST schema unchanged.

## Test plan

- Unit tests for `classifyBodyDeps` covering each class of policy.
- Streaming parser unit tests: feed a JSON byte-by-byte, assert
events fire in order.
- Integration test: a policy that references only `input.body.action`
decides before the full payload is delivered. Measure that
evaluation latency is independent of payload size.
- Negative test: a policy that needs `input.body.items[*].sku` falls
back to the buffered path even with `streaming.enabled: true`.

## Open questions

- The streaming parser nearly doubles the size of `src/json.zig`.
Worth running a sizing experiment before committing: does
`--release=small` keep zopa.wasm under 80 KB with both paths
present?
Comment on lines +130 to +133

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To address the binary size concern, consider refactoring src/json.zig to use a shared low-level lexer. Both the existing recursive-descent parser and the new streaming evaluator can consume this lexer, avoiding duplication of complex logic like string escape decoding and number parsing.

- How aggressive is the prefix analysis? A conservative pass classifies
more policies as "full-tree" and forfeits the optimization. A
precise pass requires reasoning about `some` / `every` over body
arrays, which is non-trivial.
- Should the streaming path also feed Envoy's body forwarding so
large uploads don't pause? Probably yes, but the proxy-wasm body
ABI semantics need a careful read first.
184 changes: 184 additions & 0 deletions src/body_deps.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
//! Static analysis of body dependencies in a compiled policy.
//!
//! Walks the AST once, classifying how the policy references the
//! request body. The proxy-wasm shim uses the result to decide
//! whether to skip `proxy_on_request_body` entirely (no body refs),
//! buffer until specific paths resolve (prefix-only), or buffer the
//! whole body up to `max_body_bytes` (full-tree).
//!
//! Streaming evaluation itself (per `docs/proposals/streaming-evaluation.md`)
//! depends on the body-aware callback path landing first; this
//! analyser is the configure-time piece that ships independently.

const std = @import("std");
const ast = @import("ast.zig");

pub const Class = enum {
/// Policy does not reference `input.body` anywhere.
no_body_refs,
/// Policy references body sub-paths (e.g. `input.body.amount`).
/// A streaming evaluator can decide as soon as those resolve.
prefix_only,
/// Policy references `input.body` as a whole or iterates over
/// the body's contents. The full body must be buffered.
full_tree,
};

pub const BodyDeps = struct {
class: Class,
/// Number of distinct body sub-paths referenced when
/// `class == .prefix_only`. Always 0 for the other classes.
prefix_count: usize,
};

/// Classify body usage of every rule reachable in `module`.
/// Conservative: when in doubt, returns `.full_tree`.
pub fn analyze(module: ast.Module) BodyDeps {
var st = State{};
for (module.rules) |rule| {
for (rule.body) |expr| visit(&st, expr);
if (rule.value) |v| visit(&st, v);
}
return st.finalize();
}

const State = struct {
refs_whole: bool = false,
prefix_count: usize = 0,

fn finalize(self: State) BodyDeps {
if (self.refs_whole) return .{ .class = .full_tree, .prefix_count = 0 };
if (self.prefix_count == 0) return .{ .class = .no_body_refs, .prefix_count = 0 };
return .{ .class = .prefix_only, .prefix_count = self.prefix_count };
}
};

fn visit(st: *State, expr: *const ast.Expr) void {
switch (expr.*) {
.value => {},
.ref => |path| visitRef(st, path),
.compare => |c| {
visit(st, c.left);
visit(st, c.right);
},
.not => |inner| visit(st, inner),
.some, .every => |it| {
visit(st, it.source);
visit(st, it.body);
// Iterating over a ref into the body is full-tree:
// the iterator needs the entire collection. The source
// visit above marks the path; we promote to whole if
// the source is itself an `input.body...` ref.
if (it.source.* == .ref) {
if (refTouchesBody(it.source.ref)) st.refs_whole = true;
}
},
.call => |c| for (c.args) |arg| visit(st, arg),
}
}

fn visitRef(st: *State, path: []const []const u8) void {
if (!refTouchesBody(path)) return;

// `input.body` (or just `body`) by itself = whole-tree dependency.
// Body sub-paths (input.body.amount, body.user) = prefix-only.
const body_index = bodySegmentIndex(path) orelse return;
if (body_index + 1 >= path.len) {
st.refs_whole = true;
} else {
st.prefix_count += 1;
}
}

fn refTouchesBody(path: []const []const u8) bool {
return bodySegmentIndex(path) != null;
}

/// Locate the `body` segment inside an input ref. Accepts both
/// `["input", "body", ...]` and the shorthand `["body", ...]`.
fn bodySegmentIndex(path: []const []const u8) ?usize {
if (path.len == 0) return null;
if (std.mem.eql(u8, path[0], "body")) return 0;
if (path.len >= 2 and std.mem.eql(u8, path[0], "input") and std.mem.eql(u8, path[1], "body")) {
return 1;
}
return null;
}

const testing = std.testing;
const json = @import("json.zig");

fn classify(src: []const u8) !Class {
var arena = std.heap.ArenaAllocator.init(testing.allocator);
defer arena.deinit();
const node = try json.parse(arena.allocator(), src);
const module = try ast.buildModule(arena.allocator(), node);
return analyze(module).class;
}

test "analyze: no body refs -> no_body_refs" {
const policy =
"{\"type\":\"eq\"," ++
"\"left\":{\"type\":\"ref\",\"path\":[\"input\",\"method\"]}," ++
"\"right\":{\"type\":\"value\",\"value\":\"GET\"}}";
try testing.expectEqual(Class.no_body_refs, try classify(policy));
}

test "analyze: input.body.amount -> prefix_only" {
const policy =
"{\"type\":\"gt\"," ++
"\"left\":{\"type\":\"ref\",\"path\":[\"input\",\"body\",\"amount\"]}," ++
"\"right\":{\"type\":\"value\",\"value\":100}}";
try testing.expectEqual(Class.prefix_only, try classify(policy));
}

test "analyze: bare input.body -> full_tree" {
const policy =
"{\"type\":\"neq\"," ++
"\"left\":{\"type\":\"ref\",\"path\":[\"input\",\"body\"]}," ++
"\"right\":{\"type\":\"value\",\"value\":null}}";
try testing.expectEqual(Class.full_tree, try classify(policy));
}

test "analyze: iterate input.body.items -> full_tree" {
const policy =
"{\"type\":\"every\",\"var\":\"item\"," ++
"\"source\":{\"type\":\"ref\",\"path\":[\"input\",\"body\",\"items\"]}," ++
"\"body\":{\"type\":\"value\",\"value\":true}}";
try testing.expectEqual(Class.full_tree, try classify(policy));
}

test "analyze: prefix_count counts distinct body refs" {
const policy =
"{\"type\":\"module\",\"rules\":[" ++
"{\"type\":\"rule\",\"name\":\"allow\",\"body\":[" ++
"{\"type\":\"eq\"," ++
"\"left\":{\"type\":\"ref\",\"path\":[\"input\",\"body\",\"action\"]}," ++
"\"right\":{\"type\":\"value\",\"value\":\"submit\"}}," ++
"{\"type\":\"gt\"," ++
"\"left\":{\"type\":\"ref\",\"path\":[\"input\",\"body\",\"amount\"]}," ++
"\"right\":{\"type\":\"value\",\"value\":0}}]}]}";
var arena = std.heap.ArenaAllocator.init(testing.allocator);
defer arena.deinit();
const node = try json.parse(arena.allocator(), policy);
const module = try ast.buildModule(arena.allocator(), node);
const deps = analyze(module);
try testing.expectEqual(Class.prefix_only, deps.class);
try testing.expectEqual(@as(usize, 2), deps.prefix_count);
}

test "analyze: body shorthand path (no input prefix) detected" {
const policy =
"{\"type\":\"eq\"," ++
"\"left\":{\"type\":\"ref\",\"path\":[\"body\",\"x\"]}," ++
"\"right\":{\"type\":\"value\",\"value\":1}}";
try testing.expectEqual(Class.prefix_only, try classify(policy));
}

test "analyze: call with body arg -> prefix_only" {
const policy =
"{\"type\":\"call\",\"name\":\"startswith\",\"args\":[" ++
"{\"type\":\"ref\",\"path\":[\"input\",\"body\",\"action\"]}," ++
"{\"type\":\"value\",\"value\":\"approve_\"}]}";
try testing.expectEqual(Class.prefix_only, try classify(policy));
}
2 changes: 2 additions & 0 deletions src/root.zig
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
//! `main.zig`.

pub const ast = @import("ast.zig");
pub const body_deps = @import("body_deps.zig");
pub const builtins = @import("builtins.zig");
pub const eval = @import("eval.zig");
pub const json = @import("json.zig");
Expand All @@ -24,6 +25,7 @@ test {
const testing = @import("std").testing;
testing.refAllDecls(@This());
_ = ast;
_ = body_deps;
_ = builtins;
_ = eval;
_ = json;
Expand Down
Loading