Skip to content

Commit 927b53c

Browse files
committed
test(filter): Python-Rust parity, concurrency, DLQ buffer drain
Add the final review findings from the pre-merge audit: * Python<->Rust classifier parity via shared JSON fixture (tests/fixtures/cel_classifier_parity.json). The matching dfe-engine test loads the same fixture and runs every entry through the Python classifier — both sides must agree on tier, op kind, op field, op value, and field references. * Concurrency test: 32 tokio tasks share one Arc<TransportFilterEngine> and classify 1000 messages each. Also a compile-time assert_send_sync check for FilterRule/Action/Disposition. * DLQ buffer drain tests covering take_filtered_dlq_entries(): entries are exposed, draining empties the buffer, drop filters don't pollute the buffer, and no filters means zero overhead. * Memmem nested-field limitation: documents the known false-positive case where has(field) matches a nested occurrence (needed for the 50% performance gain on the common top-level case) and the sound case where JSON-escaped quotes prevent false positives inside string values. * Per-transport smoke test for MemoryTransport filter wiring. Also add docs/TRANSPORT-FILTER-FOLLOWUP.md tracking items #7-#11: constant-time compare, log masking, pre-quoted equals fast path, msgpack edges, and expression_text reuse on reload.
1 parent 4fe1d1c commit 927b53c

3 files changed

Lines changed: 548 additions & 0 deletions

File tree

docs/TRANSPORT-FILTER-FOLLOWUP.md

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
## Transport Filter Engine — Follow-ups
2+
3+
Tracked items uncovered during the v2.4.7 review of `feat/transport-filter-engine`.
4+
None of these block the merge — they are improvements/hardening to schedule
5+
when transport filters see broader production use.
6+
7+
### #7 — Constant-time string comparison for sensitive fields
8+
9+
**Context.** `CompiledFilter::FieldEquals` / `FieldNotEquals` use plain `==`
10+
for the field-value comparison. If a downstream operator writes a filter
11+
on a high-entropy secret-like field (e.g. an API token, an opaque session
12+
ID, an HMAC), the early-exit characteristic of byte-for-byte equality
13+
leaks ~1 byte of comparison position per nanosecond of timing observation.
14+
15+
**Risk.** Low for the production use case (transport filters key off
16+
routing fields, not secrets), but the door is open. An attacker who can
17+
inject crafted payloads and observe per-message latency could in theory
18+
brute-force a guarded field value.
19+
20+
**Proposed fix.** When the filter expression compares against a value
21+
flagged as "sensitive" (heuristic match: > 16 bytes high entropy, contains
22+
`/[A-Za-z0-9+/=]{20,}/`, or explicit opt-in via filter metadata), use
23+
`subtle::ConstantTimeEq` for the comparison. Add a benchmark to measure
24+
the cost — should be negligible compared to the surrounding `sonic-rs`
25+
field extraction.
26+
27+
**Where.** `src/transport/filter/compiled.rs::evaluate()` and a new
28+
`is_sensitive_pattern()` helper.
29+
30+
---
31+
32+
### #8 — Log masking for filter expression content
33+
34+
**Context.** Filter compilation, ordering warnings, and DLQ routing all
35+
emit `tracing` events that include the raw expression text (e.g.
36+
`"filter compiled: tier=tier2 expr=field == \"prod-secret\""`). If an
37+
operator embeds a literal secret in a filter (bad practice but possible),
38+
that secret ends up in logs.
39+
40+
**Risk.** Low — the engine's existing `tracing-throttle` integration
41+
already rate-limits these events, and rustlib's `logger` module masks
42+
known sensitive field names. But the expression text itself is not run
43+
through the masker.
44+
45+
**Proposed fix.** Add an `expression_redacted()` helper that:
46+
- Replaces string literals (anything between `"…"`) with `"<redacted>"`
47+
- Keeps the structural form (`field == <redacted>`) for debugging
48+
- Is the only form ever written to logs
49+
50+
Plumb it through every `tracing::warn!` / `tracing::info!` call site in
51+
`compiled.rs`, `mod.rs`, and `metrics.rs`.
52+
53+
---
54+
55+
### #9 — Pre-quoted bytes for `field == "value"` fast path
56+
57+
**Context.** `FieldEquals` currently extracts the field value via
58+
`sonic-rs` (which produces a `LazyValue`), runs `extract_string_value`
59+
to unescape it into a `Cow<str>`, and then string-compares against the
60+
expected value. For the very common case of a single-segment field
61+
compared against an ASCII string with no escapes, we could pre-bake the
62+
bytes `"…":"value"` once at compile time and use a single `memmem::find`
63+
— same trick as `FieldExists`.
64+
65+
**Estimated win.** ~30-50% on the `field == "value"` path (currently
66+
~250-400 ns, target ~150-200 ns).
67+
68+
**Caveats.**
69+
- Only safe when both the field name AND the value contain no JSON-escape
70+
characters (`\`, `"`, control bytes). Otherwise the literal pattern
71+
doesn't match the encoded form.
72+
- Inherits the same nested-field false-positive limitation as
73+
`FieldExists` (documented in the integration tests).
74+
- Whitespace in the JSON (`{"field" : "value"}`) breaks the literal match.
75+
76+
**Proposed fix.** At compile time, classify the value as "fast-path
77+
eligible" or not. Build a `memchr::memmem::Finder<'static>` for the
78+
eligible cases and short-circuit in `evaluate()`. Fall back to the
79+
sonic-rs extraction otherwise.
80+
81+
**Where.** `src/transport/filter/compiled.rs::FieldEquals` and
82+
`evaluate()`.
83+
84+
---
85+
86+
### #10 — MsgPack edge cases bypass filtering
87+
88+
**Context.** `apply_inbound`/`apply_outbound` detect the payload format
89+
via `PayloadFormat::detect()` (cheap heuristic on the first byte). For
90+
detected MsgPack payloads we currently skip the JSON-oriented filter
91+
engine entirely — the integration test
92+
`adversarial_msgpack_bypasses_filters` documents this.
93+
94+
**Risk.** Filter rules silently do nothing on MsgPack payloads. An
95+
operator who configures `has(_drop_me)` on a Kafka topic carrying
96+
MsgPack messages will see no filtering, no warning, no metric.
97+
98+
**Proposed fix (cheap).** Emit a one-shot `tracing::warn!` per
99+
filter-engine instance the first time a MsgPack payload is seen with
100+
filters configured. Add a `dfe_transport_filter_bypassed_total{reason="msgpack"}`
101+
counter so dashboards can flag the bypass.
102+
103+
**Proposed fix (expensive).** Compile a parallel MsgPack evaluator
104+
(rmp-serde or rmpv) for each `CompiledFilter` variant. Tier 1 ops are
105+
field extractions, which work the same way on MsgPack — the only thing
106+
that changes is the lookup engine. Defer until there is a real customer
107+
asking for MsgPack filtering.
108+
109+
---
110+
111+
### #11 — Preserve original `expression_text` through reload cycles
112+
113+
**Context.** `CompiledFilter::expression_text` stores the as-typed
114+
expression string for use in error messages and ordering-warning logs.
115+
On hot-reload (`ConfigReloader`), the engine is rebuilt from the new
116+
config, but if the new config is structurally identical to the old one,
117+
the same compiled filter could be reused without re-allocating the text
118+
buffer. Currently every reload allocates fresh `String`s for every rule.
119+
120+
**Estimated win.** Negligible per reload, but reloads on a busy system
121+
with thousands of routing rules add up. Mostly an allocator-pressure
122+
hygiene item.
123+
124+
**Proposed fix.** Hash the rule list during compile and cache compiled
125+
filters keyed by hash. On reload, look up by hash before recompiling.
126+
127+
**Where.** `src/transport/filter/mod.rs::TransportFilterEngine::new`.
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
{
2+
"_comment": "Shared test corpus for the CEL classifier. The Python classifier in dfe_engine.cel.classify and the Rust classifier in hyperi-rustlib src/transport/filter/classify.rs MUST agree on every entry. A copy of this file lives in /projects/hyperi-rustlib/tests/fixtures/cel_classifier_parity.json — keep them identical.",
3+
"_columns": "expression: input. tier: 1|2|3. op_kind: optional Tier 1 op kind. op_field: optional Tier 1 field. op_value: optional Tier 1 comparison value. fields: optional sorted list of field references for Tier 2/3.",
4+
"cases": [
5+
{ "expression": "has(_table)", "tier": 1, "op_kind": "field_exists", "op_field": "_table" },
6+
{ "expression": "has(metadata.source)", "tier": 1, "op_kind": "field_exists", "op_field": "metadata.source" },
7+
{ "expression": " has( _internal ) ", "tier": 1, "op_kind": "field_exists", "op_field": "_internal" },
8+
{ "expression": "!has(_internal)", "tier": 1, "op_kind": "field_not_exists", "op_field": "_internal" },
9+
{ "expression": "! has( _table )", "tier": 1, "op_kind": "field_not_exists", "op_field": "_table" },
10+
{ "expression": "source == \"internal\"", "tier": 1, "op_kind": "field_equals", "op_field": "source", "op_value": "internal" },
11+
{ "expression": "source != \"external\"", "tier": 1, "op_kind": "field_not_equals", "op_field": "source", "op_value": "external" },
12+
{ "expression": "host.startsWith(\"prod-\")", "tier": 1, "op_kind": "field_starts_with", "op_field": "host", "op_value": "prod-" },
13+
{ "expression": "host.endsWith(\".internal\")", "tier": 1, "op_kind": "field_ends_with", "op_field": "host", "op_value": ".internal" },
14+
{ "expression": "message.contains(\"error\")", "tier": 1, "op_kind": "field_contains", "op_field": "message", "op_value": "error" },
15+
{ "expression": "metadata.host.startsWith(\"db-\")", "tier": 1, "op_kind": "field_starts_with", "op_field": "metadata.host", "op_value": "db-" },
16+
17+
{ "expression": "severity > 3", "tier": 2, "fields": ["severity"] },
18+
{ "expression": "severity > 3 && source != \"internal\"", "tier": 2, "fields": ["severity", "source"] },
19+
{ "expression": "size(tags) > 0", "tier": 2, "fields": ["tags"] },
20+
{ "expression": "user.role == admin_role", "tier": 2, "fields": ["user.role", "admin_role"] },
21+
{ "expression": "count >= 10 || count <= 1", "tier": 2, "fields": ["count"] },
22+
{ "expression": "field == \"value with == in it\"", "tier": 1, "op_kind": "field_equals", "op_field": "field", "op_value": "value with == in it" },
23+
24+
{ "expression": "host.matches(\"^prod-.*$\")", "tier": 3, "fields": ["host"] },
25+
{ "expression": "tags.exists(t, t == \"hot\")", "tier": 3, "fields": ["tags", "t"] },
26+
{ "expression": "items.all(i, i.size > 0)", "tier": 3, "fields": ["i", "i.size", "items"] },
27+
{ "expression": "tags.filter(t, t.startsWith(\"prod\")).size() > 0", "tier": 3, "fields": ["tags", "t"] },
28+
{ "expression": "timestamp(created_at) > timestamp(\"2026-01-01\")", "tier": 3, "fields": ["created_at"] },
29+
{ "expression": "duration(\"5m\") > duration(\"1m\")", "tier": 3, "fields": [] }
30+
]
31+
}

0 commit comments

Comments
 (0)