Skip to content

Commit 45c307a

Browse files
committed
fix: add async sink engine API, fix doc-tests — Phase 2A
BatchEngine gains run_async() and run_raw_async() with: - Async sink closure (enables ClickHouse inserts, Kafka produce) - Sink-managed commit tokens (deferred commit for loader pattern) - Optional ticker callback (flush timers within engine loop) - 4 new tests (MemoryTransport-backed) Doc-test fixes (No SEP): - flat_env: unsafe set_var in edition 2024 → no_run - registry: undefined MyConfig pseudo-code → ignore
1 parent 07136d5 commit 45c307a

4 files changed

Lines changed: 495 additions & 93 deletions

File tree

TODO.md

Lines changed: 90 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -8,89 +8,103 @@
88

99
## Current Tasks
1010

11-
### DFE Full Remediation — Rustlib v2.0→HEAD Catch-Up
12-
13-
All 6 DFE apps need full catch-up covering BatchEngine, ServiceRuntime,
14-
TopicResolver, metrics manifest, RuntimeContext, deployment contract, SIMD
15-
parse, pre-route filtering, field interning, etc.
16-
17-
Plan: `docs/superpowers/plans/2026-04-02-dfe-full-remediation.md`
18-
19-
Remediation order:
20-
- [ ] dfe-loader `[IN PROGRESS]`
21-
- Current state: waiting for CI to publish new rustlib version
22-
- Next: bump rustlib version, adopt ServiceRuntime + BatchEngine, delete bespoke TopicResolver
23-
- Heaviest — ~500 line delta
24-
- [ ] dfe-receiver
25-
- BatchEngine standalone `process_mid_tier` (HTTP inbound)
26-
- BatchAccumulator for Splunk HEC request batching
27-
- [ ] dfe-archiver
28-
- BatchEngine `run_raw` (passthrough, no parse)
29-
- TopicResolver auto-discovery (`topics: []`)
30-
- [ ] dfe-fetcher
31-
- BatchEngine standalone `process_mid_tier`
32-
- `fan_out_async` for within-source parallel service fetching
33-
- [ ] dfe-transform-vector
34-
- Lightest — ServiceRuntime only, no BatchEngine (Vector owns pipeline)
35-
- [ ] dfe-transform-vrl
36-
- BatchEngine `run()` replaces pipeline loop
37-
- sonic-rs replaces serde_json
38-
39-
Per-app remediation includes:
40-
- Bump hyperi-rustlib to latest
41-
- Push committed DfeMetrics::register(&mgr) fix
42-
- Adopt ServiceRuntime (remove manual MetricsManager/MemoryGuard/pool/shutdown wiring)
43-
- Adopt BatchEngine (replace manual recv→parse→transform loop)
44-
- TopicResolver where applicable
45-
- Generate deployment contract artefacts
11+
### DFE Full Remediation — Phase 2: Deep Integration (AMENDED 2026-04-03)
12+
13+
> **Scope directive:** Don't go light. The whole point of BatchEngine and
14+
> parallelisation is to FORCE architectural improvement in all apps. Every app
15+
> gets: SIMD parsing, pre-route filtering, chunked rayon, field interning,
16+
> standardised PipelineStats. Where the engine API doesn't fit, evolve the engine.
17+
>
18+
> **No SEP rule:** Every warning, clippy lint, doc-test failure, or bug
19+
> encountered during this remediation is OUR problem. We own every line we
20+
> touch AND every pre-existing issue in files we modify. Fix it there and then.
21+
22+
Phase 1 done (all 6 apps on rustlib >=2.4.3, ServiceRuntime, released).
23+
Phase 2 plan: `docs/superpowers/plans/2026-04-03-dfe-phase2-deep-integration.md`
24+
25+
**Phase 2A: Evolve BatchEngine API (rustlib)**
26+
- [ ] Async sink for run() / run_raw() (enables loader's async CH inserts)
27+
- [ ] Optional ticker callback (enables flush timers inside engine loop)
28+
- [ ] TransportReceiver impl for loader's TransportBackend (clean adapter)
29+
- [ ] Publish rustlib with engine changes
30+
31+
**Phase 2B: Per-App Integration**
32+
- [ ] dfe-loader: engine.process_mid_tier() replaces MessageProcessor
33+
- SIMD parse (sonic-rs), pre-route routing_field="_table", field interning
34+
- Transform closure wraps existing pipeline (route→extract→enrich)
35+
- BatchCoordinator stays (sequential buffer push, DLQ)
36+
- Orchestrator select! loop stays (5 arms: shutdown, config, flush, schema, recv)
37+
- [ ] dfe-transform-vrl: engine.process_mid_tier() replaces deser+VRL phases
38+
- Benchmark: sonic_rs::Value → vrl::Value conversion cost first
39+
- Single engine call replaces two pool.process_batch() calls
40+
- Produce stays sequential (async I/O)
41+
- [ ] dfe-archiver: engine.run_raw() or process_raw() + TopicResolver
42+
- Pre-route routing_field="_destination", TopicResolver auto-discovery
43+
- Flush timer as separate task (or engine ticker if evolved)
44+
- [ ] dfe-receiver: BatchAccumulator for Splunk HEC only (additive)
45+
- DO NOT replace zero-copy hot path (validation, routing, enrichment)
46+
- Existing optimisations proven faster than generic alternatives
47+
- [ ] dfe-fetcher: fan_out_async() + engine.process_raw() for post-fetch
48+
- JoinSet in each Source impl (AWS/Azure/M365/GCP)
49+
- Parallel enrich+filter via engine
50+
- [ ] dfe-transform-vector: generate-artefacts only
51+
- [ ] ALL apps: run `generate-artefacts` for deployment contract
52+
53+
**Phase 1 DONE (all apps):**
54+
- [x] Bump rustlib to >=2.4.3
55+
- [x] DfeMetrics::register(&mgr) signature fix
56+
- [x] ServiceRuntime adoption (where not already done)
57+
- [x] DeploymentContract schema_version + oci_labels fields
58+
- [x] Debug/trace logging across all pipeline stages
59+
- [x] Target symlink + orphan submodule fixes
60+
- [x] All 6 apps released with Phase 1 changes
61+
62+
### clickhouse-rs
63+
64+
- [x] Schema cache drift detection (issue #20 fix in dfe-loader)
65+
- [x] Expanded SchemaMismatch error classification
66+
- [x] Auto-invalidate schema cache on end() failure
67+
- [x] Nullable(JSON) RowBinary insert fix (c178b37)
68+
- [x] Nullable/non-nullable tests for ALL types
69+
- [x] Upstream PR: ClickHouse/clickhouse-rs#414
70+
- [ ] Monitor upstream PR review feedback
4671

4772
### Deferred
4873

4974
- [ ] Columnar (SoA) batch layout — backlogged, benchmark later
5075
- [ ] String interning deeper integration — custom Value type with interned keys
51-
- [ ] Disk buffer improvements (rkyv zero-copy) — separate concern (tiered-sink/spool)
52-
- [ ] Metrics manifest enrichment — `set_use_cases()` / `set_dashboard_hint()` content
76+
- [ ] Disk buffer improvements (rkyv zero-copy) — tiered-sink/spool
77+
- [ ] Metrics manifest enrichment — set_use_cases() / set_dashboard_hint()
5378
- [ ] Mutex audit documentation (in standards)
5479
- [ ] Phase 3: dfe-transform-wasm, dfe-transform-elastic, dfe-transform-splunk
55-
56-
### Completed This Session (2026-04-02)
57-
58-
- [x] **BatchEngine** — SIMD-optimised batch processing framework (15 commits)
59-
- sonic-rs SIMD JSON parse (2-4x faster than serde_json)
60-
- Pre-route field extraction via `get_from_slice` (skip full parse for filtered/DLQ)
61-
- FieldInterner (DashMap concurrent field name dedup)
62-
- Two tiers: mid-tier (materialised DOM) and full-tier (raw passthrough)
63-
- Transport-wired `run()` loop + standalone `process_mid_tier()`/`process_raw()`
64-
- Auto-wired from ServiceRuntime — zero boilerplate for apps
65-
- MemoryGuard-bounded chunking between batch chunks
66-
- 78 tests (unit + integration + adversarial + concurrent + 20K scale)
67-
- Criterion benchmarks (engine vs manual serde_json)
68-
- [x] **TopicResolver** — Kafka topic auto-discovery in rustlib transport-kafka
69-
- Configurable suppression rules (default: `_load` suppresses `_land`)
70-
- Include/exclude regex filters
71-
- TopicRefreshHandle for periodic re-resolution
72-
- Wired into KafkaTransport::new() — auto-discovers when `topics: []`
73-
- [x] **PipelineStats.filtered** — new atomic counter for pre-route filtered messages
74-
- [x] **AdaptiveWorkerPool.install()** — expose rayon pool for `par_iter_mut`
75-
- [x] **Dependencies** — sonic-rs, dashmap, bytes, regex added to worker/transport-kafka
80+
- [ ] Debug/trace logging increasing detail levels (all repos)
81+
82+
### Completed This Session
83+
84+
- [x] **BatchEngine** — 15 commits, SIMD batch processing framework
85+
- [x] **TopicResolver** — Kafka topic auto-discovery with suppression rules
86+
- [x] **PipelineStats.filtered** counter
87+
- [x] **AdaptiveWorkerPool.install()** exposure
88+
- [x] **Prometheus recorder test fix** — test-safe MetricsManager
89+
- [x] **Rustlib v2.4.3** published to crates.io
90+
- [x] **clickhouse-rs schema-cache-strong** branch — 3 commits + Nullable(JSON) fix
91+
- [x] **Upstream PR** ClickHouse/clickhouse-rs#414
92+
- [x] **dfe-loader** v1.16.4 — ServiceRuntime, TopicResolver deletion, schema cache fix, logging
93+
- [x] **dfe-receiver** v1.14.8 — ServiceRuntime, logging, target symlink + orphan ci fix
94+
- [x] **dfe-archiver** v1.6.1 — ServiceRuntime, logging, deployment contract fields
95+
- [x] **dfe-fetcher** v1.1.8 — ServiceRuntime, logging, SensitiveString fix
96+
- [x] **dfe-transform-vector** — ServiceRuntime, logging, SensitiveString fix
97+
- [x] **dfe-transform-vrl** — rustlib bump, logging (CI pending)
98+
- [x] **Target symlink fix** across 4 repos (receiver, archiver, fetcher, vrl)
99+
- [x] **Orphan ci submodule** removed from dfe-receiver
76100

77101
### Completed Previous Sessions
78102

79-
- [x] BatchAccumulator (bounded channel, time/count/bytes drain thresholds)
80-
- [x] NDJSON split utilities (split_lines, count_lines)
81-
- [x] Adversarial worker pool tests (34 tests)
82-
- [x] RuntimeContext + startupz integration tests
83-
- [x] ServiceRuntime (auto-wired DfeApp infrastructure)
84-
- [x] RuntimeContext (K8s metadata, cgroup limits)
85-
- [x] Deployment contract CI bridge (container-manifest.json, OCI labels)
103+
- [x] BatchAccumulator, NDJSON split, adversarial worker tests
104+
- [x] RuntimeContext, ServiceRuntime, deployment contract CI bridge
86105
- [x] Metrics manifest + CLI subcommands
87-
- [x] AdaptiveWorkerPool (rayon + tokio hybrid)
88-
- [x] BatchProcessor trait + BatchPipeline + PipelineStats
89-
- [x] Code review remediation — security fixes, panic removal, health wiring
90-
- [x] DLQ HTTP + Redis backends
91-
- [x] Transport trait split + factory + File/Pipe/HTTP/Redis transports
92-
- [x] HealthRegistry, Shutdown manager, OTel trace propagation
93-
- [x] KafkaTransport StatsContext, gRPC transport metrics
106+
- [x] AdaptiveWorkerPool, BatchProcessor, PipelineStats
107+
- [x] All transport backends, HealthRegistry, Shutdown manager
94108
- [x] Config registry, SensitiveString, /config endpoint
95109

96110
---
@@ -102,18 +116,6 @@ Per-app remediation includes:
102116
- [ ] GCP Secret Manager provider (`secrets-gcp` feature)
103117
- [ ] Azure Key Vault provider (`secrets-azure` feature)
104118

105-
### Kafka — Opinionated SASL-SCRAM Named Constructors
106-
107-
- [ ] `KafkaConfig::external_sasl_scram(brokers, username, password)`
108-
- [ ] `KafkaConfig::internal_sasl_scram(brokers, username, password)`
109-
110-
### Identity / Auth Module (Discussion)
111-
112-
- [ ] Token validation middleware (JWT/OIDC) for gRPC interceptor + axum middleware
113-
- [ ] Service identity (service name + instance ID for mTLS, audit logs)
114-
- [ ] Break-glass: static bearer token from secrets module
115-
- [ ] Design decision: dfe-engine as SSoT, rustlib validates tokens only
116-
117119
### Other
118120

119121
- [ ] PII anonymiser (evaluate Rust libraries)
@@ -123,11 +125,10 @@ Per-app remediation includes:
123125

124126
## Notes
125127

126-
- Transport backends: Kafka, gRPC, Memory, File, Pipe, HTTP, Redis/Valkey
127-
- Core pillars plan: `docs/superpowers/plans/2026-03-26-core-pillars.md`
128128
- BatchEngine spec: `docs/superpowers/specs/2026-04-02-batch-engine-design.md`
129129
- TopicResolver spec: `docs/superpowers/specs/2026-04-02-topic-resolver-design.md`
130-
- DFE remediation plan: `docs/superpowers/plans/2026-04-02-dfe-full-remediation.md`
131-
- Two deployment modes: Kafka-mediated (persistence) vs direct gRPC (low latency)
132-
- Routed transport is receiver/fetcher only — all other stages are 1:1
130+
- DFE remediation plan (Phase 1): `docs/superpowers/plans/2026-04-02-dfe-full-remediation.md`
131+
- DFE Phase 2 plan (amended): `docs/superpowers/plans/2026-04-03-dfe-phase2-deep-integration.md`
132+
- Phase 2A evolves the engine API first (async sink, ticker, transport adapter)
133+
- Phase 2B applies engine per-app: loader → VRL → archiver → receiver → fetcher → vector
133134
- Common patterns in rustlib first — if all 6 DFE projects use the same pattern, implement in rustlib

src/config/flat_env.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,14 @@
2424
//! The helper functions are designed to be called by `#[derive(FlatEnvOverrides)]`
2525
//! generated code, but are also usable standalone:
2626
//!
27-
//! ```rust
27+
//! ```rust,no_run
2828
//! use hyperi_rustlib::config::flat_env::*;
2929
//!
30-
//! # std::env::set_var("MYAPP_HOST", "example.com");
30+
//! // In production, env vars are set by the container/K8s ConfigMap.
31+
//! // std::env::set_var is unsafe in edition 2024 — use temp_env in tests.
3132
//! if let Some(host) = flat_env_string("MYAPP", "HOST") {
3233
//! println!("Host override: {host}");
3334
//! }
34-
//! # std::env::remove_var("MYAPP_HOST");
3535
//! ```
3636
3737
use std::str::FromStr;

src/config/registry.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
//!
1919
//! # Auto-registration
2020
//!
21-
//! ```rust,no_run
21+
//! ```rust,ignore
2222
//! use hyperi_rustlib::config;
2323
//!
2424
//! // This automatically registers "expression" in the registry:

0 commit comments

Comments
 (0)