Skip to content

Commit 813da63

Browse files
committed
perf(filter): SIMD memmem fast-path for has() — 51% faster, Tier 2/3 CEL
Chunk 4 — performance optimisation, CEL evaluation tests, benchmarks, docs. SIMD/zero-copy review optimisations: - Pre-compiled memchr::memmem::Finder for FieldExists/FieldNotExists with single-segment paths. Detects "field": pattern in raw bytes, bypassing JSON parser entirely. - has(_table) match: 218ns -> 107ns (51% faster) - 5 filters first-match-wins: 1697ns -> 129ns (92% faster) - Stack-allocated path arrays via with_path_refs (1-4 segments inline) eliminates per-message Vec allocation - extract_string_value uses sonic-rs is_str() + memchr SIMD escape detection — zero-copy Cow<str> for the common case (no escapes) - Fix field extraction in classify.rs for method calls (host.matches was extracting "host.matches" instead of "host") Tier 2/3 CEL evaluation tests (5 new): - Compound expression: severity > 3 && source != "internal" - size() function on arrays - Field-to-field comparison - Tier 3 regex via host.matches("^prod-.*") - Missing field safety Benchmark suite (benches/filter_benchmark.rs): - No-filter baseline: 3.2 ns (zero overhead confirmed) - Tier 1 has() match: 107 ns (memmem fast-path) - Tier 1 ==: 288 ns match, 741 ns no-match (sonic-rs full scan) - Tier 1 startsWith: 250 ns - Tier 1 dotted path: 239 ns - Tier 2/3 CEL benchmarks (feature-gated) Docs: - STATE.md: added transport-filter to Key Components - STATE.md: added TransportFilterEngine decision entry 700 tests pass, clippy clean with --all-features. memchr added to transport feature for SIMD substring search.
1 parent b847946 commit 813da63

6 files changed

Lines changed: 569 additions & 110 deletions

File tree

Cargo.toml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ config-reload = ["config", "parking_lot", "tokio", "tracing"]
9696
config-postgres = ["config", "sqlx", "tokio", "serde_json"]
9797

9898
# Transport features
99-
transport = ["tokio", "serde_json", "rmp-serde", "chrono", "async-trait", "regex"]
99+
transport = ["tokio", "serde_json", "rmp-serde", "chrono", "async-trait", "regex", "memchr"]
100100
transport-memory = ["transport"]
101101
transport-kafka = ["transport", "rdkafka", "regex", "tokio-util"]
102102
transport-grpc = ["transport", "dep:tonic", "dep:tonic-prost", "dep:prost", "dep:prost-types", "dep:tonic-prost-build", "dep:prost-build"]
@@ -232,6 +232,7 @@ sonic-rs = { version = ">=0.5, <1", optional = true }
232232

233233
# Regex (topic resolver include/exclude filters)
234234
regex = { version = ">=1.11, <2", optional = true }
235+
memchr = { version = ">=2.7", optional = true }
235236

236237
# Async trait (for tiered-sink Sink trait)
237238
async-trait = { version = ">=0.1.88, <0.2", optional = true }
@@ -306,3 +307,8 @@ harness = false
306307
name = "engine_benchmark"
307308
harness = false
308309
required-features = ["worker"]
310+
311+
[[bench]]
312+
name = "filter_benchmark"
313+
harness = false
314+
required-features = ["transport-memory"]

STATE.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ Modular library with feature-gated components. Each module can be enabled/disabl
3939
15. **memory** - MemoryGuard: cgroup-aware memory backpressure with auto-detection
4040
16. **scaling** - ScalingPressure: KEDA autoscaling signal calculation
4141
17. **cli** - DfeApp trait, ServiceRuntime (pre-wired metrics + worker pool + batch engine + memory guard + shutdown)
42+
18. **transport-filter** - TransportFilterEngine: CEL-syntax message filtering embedded in every transport. Tier 1 SIMD field ops (~50-100ns), Tier 2 compiled CEL (opt-in), Tier 3 complex CEL with regex/iteration (opt-in). Inbound/outbound, drop/dlq, first-match-wins.
4243

4344
### Tech Stack
4445

@@ -154,6 +155,7 @@ spool, cache, secrets, HTTP client, DLQ — with zero additional wiring.
154155
- **DFE parallelisation pattern** — split sequential hot loops into parallel (pure `&self` computation via rayon) and sequential (mutable state: buffer push, mark_pending, stats, DLQ) phases. The `BatchProcessor` trait + `BatchPipeline` struct in rustlib provide the common framework. Each DFE app implements `BatchProcessor` for its domain. See `src/pipeline/` module.
155156
- **ServiceRuntime** — pre-built infrastructure for DFE service apps. Created by `run_app()` before `run_service()`. Contains MetricsManager, DfeMetrics, MemoryGuard (optional), shutdown token (with K8s pre-stop delay), worker pool (optional), batch engine (optional), scaling pressure (optional), RuntimeContext. Apps receive it fully wired. See `src/cli/runtime.rs`.
156157
- **BatchEngine** — SIMD-optimised batch processing for DFE pipelines. Two modes: `process_mid_tier()` (parse JSON via sonic-rs + parallel transform via rayon) and `process_raw()` (skip parsing, parallel transform on raw bytes). Transport-wired: `run_async()` / `run_raw_async()` with async sink, sink-managed commit tokens, and optional ticker callback. See `src/worker/engine/`.
158+
- **TransportFilterEngine** — CEL-syntax message filtering embedded in every transport (Kafka, gRPC, Memory, File, Pipe, HTTP, Redis). Three performance tiers: Tier 1 (SIMD field ops via sonic_rs::get_from_slice, ~50-200ns/msg, always enabled), Tier 2 (compiled CEL with extracted fields, requires `expression.allow_cel_filters_in/out`), Tier 3 (CEL with regex/iteration/time, requires `expression.allow_complex_filters_in/out`). Operators write CEL syntax — engine classifies via text pattern matching and bypasses CEL engine entirely for Tier 1. First-match-wins, drop/dlq actions, fail-fast at startup. Zero downstream code changes — config-only activation. See `src/transport/filter/`.
157159
- **RuntimeContext** — rich runtime metadata detected once at startup (pod_name, namespace, node_name, container_id, memory_limit_bytes, cpu_quota_cores). Global singleton via OnceLock. All modules read from this instead of doing their own env var lookups. No-ops on bare metal. See `src/env.rs`.
158160
- **K8s pre-stop compliance** — shutdown handler sleeps `PRESTOP_DELAY_SECS` (default 5 in K8s, 0 elsewhere) before cancelling the token. Prevents traffic routing to a draining pod.
159161
- **Deployment contract CI bridge**`container-manifest.json` (minimal CI subset), `Dockerfile.runtime` (runtime stage fragment for CI composition), OCI labels (static from contract, dynamic injected by CI), `from_cargo_toml()` for auto-detecting native deps, `schema_version` field.

benches/filter_benchmark.rs

Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
// Project: hyperi-rustlib
2+
// File: benches/filter_benchmark.rs
3+
// Purpose: Criterion benchmarks for transport filter engine performance
4+
// Language: Rust
5+
//
6+
// License: FSL-1.1-ALv2
7+
// Copyright: (c) 2026 HYPERI PTY LIMITED
8+
9+
//! Benchmarks for the transport filter engine.
10+
//!
11+
//! Validates the design assumption: Tier 1 filters are ~50-100ns/msg via SIMD,
12+
//! and the no-filter overhead is negligible.
13+
14+
use criterion::{Criterion, Throughput, criterion_group, criterion_main};
15+
16+
use hyperi_rustlib::transport::filter::{
17+
FilterAction, FilterDisposition, FilterRule, TransportFilterEngine, TransportFilterTierConfig,
18+
};
19+
20+
const SAMPLE_PAYLOAD: &[u8] = br#"{"_table":"events","host":"prod-web01","source_type":"syslog","severity":3,"id":12345,"timestamp":"2026-04-10T12:00:00Z","message":"Sample log event with some realistic padding for benchmarking"}"#;
21+
22+
const POISON_PAYLOAD: &[u8] = br#"{"_table":"events","status":"poison","data":"x"}"#;
23+
24+
fn bench_no_filters_baseline(c: &mut Criterion) {
25+
let engine = TransportFilterEngine::empty();
26+
27+
let mut group = c.benchmark_group("filter_no_filters");
28+
group.throughput(Throughput::Elements(1));
29+
group.bench_function("apply_inbound_no_filters", |b| {
30+
b.iter(|| std::hint::black_box(engine.apply_inbound(SAMPLE_PAYLOAD)));
31+
});
32+
group.finish();
33+
}
34+
35+
fn bench_tier1_field_exists(c: &mut Criterion) {
36+
let engine = TransportFilterEngine::new(
37+
&[FilterRule {
38+
expression: "has(_table)".into(),
39+
action: FilterAction::Drop,
40+
}],
41+
&[],
42+
&TransportFilterTierConfig::default(),
43+
)
44+
.unwrap();
45+
46+
let mut group = c.benchmark_group("filter_tier1_field_exists");
47+
group.throughput(Throughput::Elements(1));
48+
group.bench_function("match", |b| {
49+
b.iter(|| std::hint::black_box(engine.apply_inbound(SAMPLE_PAYLOAD)));
50+
});
51+
group.finish();
52+
}
53+
54+
fn bench_tier1_field_equals(c: &mut Criterion) {
55+
let engine = TransportFilterEngine::new(
56+
&[FilterRule {
57+
expression: r#"status == "poison""#.into(),
58+
action: FilterAction::Drop,
59+
}],
60+
&[],
61+
&TransportFilterTierConfig::default(),
62+
)
63+
.unwrap();
64+
65+
let mut group = c.benchmark_group("filter_tier1_field_equals");
66+
group.throughput(Throughput::Elements(1));
67+
group.bench_function("no_match_pass", |b| {
68+
b.iter(|| {
69+
let result = engine.apply_inbound(SAMPLE_PAYLOAD);
70+
assert_eq!(result, FilterDisposition::Pass);
71+
std::hint::black_box(result)
72+
});
73+
});
74+
group.bench_function("match_drop", |b| {
75+
b.iter(|| {
76+
let result = engine.apply_inbound(POISON_PAYLOAD);
77+
assert_eq!(result, FilterDisposition::Drop);
78+
std::hint::black_box(result)
79+
});
80+
});
81+
group.finish();
82+
}
83+
84+
fn bench_tier1_starts_with(c: &mut Criterion) {
85+
let engine = TransportFilterEngine::new(
86+
&[FilterRule {
87+
expression: r#"host.startsWith("prod-")"#.into(),
88+
action: FilterAction::Drop,
89+
}],
90+
&[],
91+
&TransportFilterTierConfig::default(),
92+
)
93+
.unwrap();
94+
95+
let mut group = c.benchmark_group("filter_tier1_starts_with");
96+
group.throughput(Throughput::Elements(1));
97+
group.bench_function("match", |b| {
98+
b.iter(|| std::hint::black_box(engine.apply_inbound(SAMPLE_PAYLOAD)));
99+
});
100+
group.finish();
101+
}
102+
103+
fn bench_tier1_dotted_path(c: &mut Criterion) {
104+
let nested_payload =
105+
br#"{"metadata":{"source":"aws","region":"ap-southeast-2"},"event":"login"}"#;
106+
let engine = TransportFilterEngine::new(
107+
&[FilterRule {
108+
expression: r#"metadata.source == "aws""#.into(),
109+
action: FilterAction::Drop,
110+
}],
111+
&[],
112+
&TransportFilterTierConfig::default(),
113+
)
114+
.unwrap();
115+
116+
let mut group = c.benchmark_group("filter_tier1_dotted_path");
117+
group.throughput(Throughput::Elements(1));
118+
group.bench_function("nested_match", |b| {
119+
b.iter(|| std::hint::black_box(engine.apply_inbound(nested_payload)));
120+
});
121+
group.finish();
122+
}
123+
124+
fn bench_tier1_first_match_wins(c: &mut Criterion) {
125+
// 5 filters, message matches the third one
126+
let rules = vec![
127+
FilterRule {
128+
expression: "has(no_match_1)".into(),
129+
action: FilterAction::Drop,
130+
},
131+
FilterRule {
132+
expression: "has(no_match_2)".into(),
133+
action: FilterAction::Drop,
134+
},
135+
FilterRule {
136+
expression: "has(_table)".into(),
137+
action: FilterAction::Drop,
138+
},
139+
FilterRule {
140+
expression: "has(no_match_3)".into(),
141+
action: FilterAction::Drop,
142+
},
143+
FilterRule {
144+
expression: "has(no_match_4)".into(),
145+
action: FilterAction::Drop,
146+
},
147+
];
148+
let engine =
149+
TransportFilterEngine::new(&rules, &[], &TransportFilterTierConfig::default()).unwrap();
150+
151+
let mut group = c.benchmark_group("filter_tier1_first_match_wins");
152+
group.throughput(Throughput::Elements(1));
153+
group.bench_function("match_at_position_3", |b| {
154+
b.iter(|| std::hint::black_box(engine.apply_inbound(SAMPLE_PAYLOAD)));
155+
});
156+
group.finish();
157+
}
158+
159+
#[cfg(feature = "expression")]
160+
fn bench_tier2_compound_cel(c: &mut Criterion) {
161+
let tier_config = TransportFilterTierConfig {
162+
allow_cel_filters_in: true,
163+
..Default::default()
164+
};
165+
let engine = TransportFilterEngine::new(
166+
&[FilterRule {
167+
expression: r#"severity > 3 && source != "internal""#.into(),
168+
action: FilterAction::Drop,
169+
}],
170+
&[],
171+
&tier_config,
172+
)
173+
.unwrap();
174+
175+
let payload = br#"{"severity":5,"source":"external","data":"x"}"#;
176+
177+
let mut group = c.benchmark_group("filter_tier2_compound_cel");
178+
group.throughput(Throughput::Elements(1));
179+
group.bench_function("compound_cel_match", |b| {
180+
b.iter(|| std::hint::black_box(engine.apply_inbound(payload)));
181+
});
182+
group.finish();
183+
}
184+
185+
#[cfg(feature = "expression")]
186+
fn bench_tier3_regex_cel(c: &mut Criterion) {
187+
let tier_config = TransportFilterTierConfig {
188+
allow_complex_filters_in: true,
189+
..Default::default()
190+
};
191+
let engine = TransportFilterEngine::new(
192+
&[FilterRule {
193+
expression: r#"host.matches("^prod-.*$")"#.into(),
194+
action: FilterAction::Drop,
195+
}],
196+
&[],
197+
&tier_config,
198+
)
199+
.unwrap();
200+
201+
let mut group = c.benchmark_group("filter_tier3_regex_cel");
202+
group.throughput(Throughput::Elements(1));
203+
group.bench_function("regex_match", |b| {
204+
b.iter(|| std::hint::black_box(engine.apply_inbound(SAMPLE_PAYLOAD)));
205+
});
206+
group.finish();
207+
}
208+
209+
#[cfg(feature = "expression")]
210+
criterion_group!(
211+
benches,
212+
bench_no_filters_baseline,
213+
bench_tier1_field_exists,
214+
bench_tier1_field_equals,
215+
bench_tier1_starts_with,
216+
bench_tier1_dotted_path,
217+
bench_tier1_first_match_wins,
218+
bench_tier2_compound_cel,
219+
bench_tier3_regex_cel,
220+
);
221+
222+
#[cfg(not(feature = "expression"))]
223+
criterion_group!(
224+
benches,
225+
bench_no_filters_baseline,
226+
bench_tier1_field_exists,
227+
bench_tier1_field_equals,
228+
bench_tier1_starts_with,
229+
bench_tier1_dotted_path,
230+
bench_tier1_first_match_wins,
231+
);
232+
233+
criterion_main!(benches);

src/transport/filter/classify.rs

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -237,34 +237,63 @@ fn check_restricted_functions(expr: &str) -> bool {
237237
/// Extract field references from an expression (for Tier 2/3 CEL context building).
238238
///
239239
/// Scans for identifier patterns that aren't CEL keywords or function names.
240-
/// Returns unique field names (may include dotted paths).
240+
/// For method calls like `field.matches("...")`, extracts only the receiver field.
241+
/// Returns unique field names (may include dotted paths for nested access).
241242
fn extract_field_references(expr: &str) -> Vec<String> {
242-
static RE_IDENT: LazyLock<Regex> =
243-
LazyLock::new(|| Regex::new(r"\b([a-zA-Z_][\w.]*)\b").unwrap());
243+
// Match dotted identifier (potentially nested) — we'll trim trailing method call
244+
// segments after matching.
245+
static RE_IDENT: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"[a-zA-Z_][\w.]*").unwrap());
244246

245247
let mut fields: Vec<String> = Vec::new();
246-
let mut in_string = false;
247248

248-
// Track string boundaries
249+
// Build a mask of which byte positions are inside string literals
250+
let mut in_string_mask = vec![false; expr.len()];
251+
let mut in_string = false;
252+
let mut prev_was_escape = false;
249253
for (i, ch) in expr.char_indices() {
250-
if ch == '"' {
254+
if in_string {
255+
in_string_mask[i] = true;
256+
}
257+
if ch == '"' && !prev_was_escape {
251258
in_string = !in_string;
252259
}
253-
if in_string {
260+
prev_was_escape = ch == '\\' && !prev_was_escape;
261+
}
262+
263+
for m in RE_IDENT.find_iter(expr) {
264+
if in_string_mask.get(m.start()).copied().unwrap_or(false) {
254265
continue;
255266
}
256267

257-
// Check if an identifier starts here
258-
if let Some(m) = RE_IDENT.find(&expr[i..])
259-
&& m.start() == 0
260-
{
261-
let ident = m.as_str();
262-
// Skip CEL keywords
263-
let base = ident.split('.').next().unwrap_or(ident);
264-
if !CEL_KEYWORDS.contains(&base) && !fields.contains(&ident.to_string()) {
265-
fields.push(ident.to_string());
268+
let mut ident = m.as_str().to_string();
269+
270+
// If this identifier is immediately followed by '(' (a function call),
271+
// strip the last dotted segment (the method name) — the receiver is
272+
// the actual field reference.
273+
let after = &expr[m.end()..];
274+
if after.trim_start().starts_with('(') {
275+
if let Some(dot_pos) = ident.rfind('.') {
276+
// Method call on a field: keep the receiver
277+
ident.truncate(dot_pos);
278+
} else {
279+
// Bare function call (e.g., has(), size()) — not a field
280+
continue;
266281
}
267282
}
283+
284+
if ident.is_empty() {
285+
continue;
286+
}
287+
288+
// Skip CEL keywords (check the leading segment)
289+
let base = ident.split('.').next().unwrap_or(&ident);
290+
if CEL_KEYWORDS.contains(&base) {
291+
continue;
292+
}
293+
294+
if !fields.contains(&ident) {
295+
fields.push(ident);
296+
}
268297
}
269298

270299
fields

0 commit comments

Comments
 (0)