Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ name = "hyperi-rustlib"
version = "1.19.8"
edition = "2024"
rust-version = "1.94"
description = "Shared utility library for HyperI Rust applications"
description = "Opinionated Rust framework for high-throughput data pipelines at PB scale. Auto-wiring config, logging, metrics, tracing, health, and graceful shutdown — built from many years of production infrastructure experience."
license = "FSL-1.1-ALv2"
repository = "https://github.com/hyperi-io/hyperi-rustlib"
publish = true
Expand All @@ -20,9 +20,11 @@ categories = ["development-tools"]
exclude = [".claude/", ".github/", "ci/", "ai/", "docs/", "examples/", "benches/", "scripts/"]

[features]
default = ["config", "logger", "metrics", "runtime"]
default = ["config", "logger", "metrics", "runtime", "shutdown", "health"]

# Core features
health = []
shutdown = ["tokio", "tokio-util"]
runtime = ["dirs"]
config = ["figment", "dotenvy", "serde_yaml_ng", "serde_json", "toml", "dirs", "tracing"]
logger = ["tracing", "tracing-subscriber", "owo-colors", "serde_json", "tracing-throttle"]
Expand Down Expand Up @@ -91,7 +93,11 @@ transport-memory = ["transport"]
transport-kafka = ["transport", "rdkafka"]
transport-grpc = ["transport", "dep:tonic", "dep:tonic-prost", "dep:prost", "dep:prost-types", "dep:tonic-prost-build", "dep:prost-build"]
transport-grpc-vector-compat = ["transport-grpc"]
transport-all = ["transport-memory", "transport-kafka", "transport-grpc"]
transport-file = ["transport", "io"]
transport-pipe = ["transport"]
transport-http = ["transport", "http"]
transport-redis = ["transport", "redis"]
transport-all = ["transport-memory", "transport-kafka", "transport-grpc", "transport-file", "transport-pipe", "transport-http", "transport-redis"]

# Secrets management
secrets = ["tokio", "serde_json", "async-trait", "parking_lot", "base64", "dirs", "tracing"]
Expand All @@ -100,7 +106,7 @@ secrets-aws = ["secrets", "aws-config", "aws-sdk-secretsmanager"]
secrets-all = ["secrets-vault", "secrets-aws"]

# Full feature set
full = ["config", "config-reload", "logger", "metrics", "metrics-dfe", "otel", "otel-metrics", "runtime", "http", "http-server", "spool", "tiered-sink", "resilience", "database", "cache", "transport-all", "transport-grpc-vector-compat", "secrets-all", "directory-config", "directory-config-git", "deployment", "version-check", "scaling", "memory", "cli", "io", "dlq", "dlq-kafka", "output-file", "expression"]
full = ["config", "config-reload", "logger", "metrics", "metrics-dfe", "otel", "otel-metrics", "runtime", "shutdown", "health", "http", "http-server", "spool", "tiered-sink", "resilience", "database", "cache", "transport-all", "transport-grpc-vector-compat", "secrets-all", "directory-config", "directory-config-git", "deployment", "version-check", "scaling", "memory", "cli", "io", "dlq", "dlq-kafka", "output-file", "expression"]

[dependencies]
# Serialisation (always needed)
Expand Down Expand Up @@ -154,8 +160,9 @@ metrics-util = { version = ">=0.20.1, <0.21", optional = true }
metrics-exporter-opentelemetry = { version = ">=0.2.1, <0.3", optional = true }
sysinfo = { version = ">=0.38.0, <0.39", optional = true }

# Async runtime (for metrics server, http-server)
tokio = { version = ">=1.50.0, <2", features = ["rt-multi-thread", "net", "sync", "time", "macros", "signal", "fs"], optional = true }
# Async runtime (for metrics server, http-server, shutdown)
tokio = { version = ">=1.50.0, <2", features = ["rt-multi-thread", "net", "sync", "time", "macros", "signal", "fs", "io-std", "io-util"], optional = true }
tokio-util = { version = ">=0.7.14, <0.8", optional = true }

# HTTP client — pinned to reqwest 0.12 until vaultrs and opentelemetry-otlp
# support 0.13. reqwest-middleware 0.4 and reqwest-retry 0.7 target 0.12.
Expand Down Expand Up @@ -183,6 +190,9 @@ rmp-serde = { version = ">=1.3.1, <2", optional = true }
# Kafka transport (dynamic-linking: use system librdkafka instead of compiling C++ from source)
rdkafka = { version = ">=0.39.0, <0.40", features = ["dynamic-linking"], optional = true }

# Redis/Valkey Streams transport
redis = { version = ">=1.0, <2", features = ["tokio-comp", "streams"], optional = true }

# gRPC transport (tonic + prost)
tonic = { version = ">=0.14, <0.15", features = ["gzip"], optional = true }
tonic-prost = { version = ">=0.14.5, <0.15", optional = true }
Expand Down
28 changes: 28 additions & 0 deletions STATE.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,34 @@ CARGO_BUILD_JOBS=2 cargo clippy

---

## Core Pillars (Non-Negotiable Design Decision)

Every module in hyperi-rustlib MUST auto-integrate with the core infrastructure
pillars using the **global singleton pattern**. Services get observability for
free — no handles passed, no opt-in, no extra code in downstream apps.

| Pillar | Singleton | Module | Pattern |
|--------|-----------|--------|---------|
| Config | `OnceLock<Config>` | `config` | `T::from_cascade()` reads from global figment |
| Logging | Global `tracing` subscriber | `logger` | `tracing::info!()` macros — always available |
| Metrics | Global `metrics` recorder | `metrics` | `metrics::counter!()` macros — no-op if no recorder |
| Tracing | Global OTel subscriber | `otel` | Trace context auto-propagated (planned: always-on) |
| Health | Global `HealthState` | `http-server` | Unified readiness flag (planned: auto-wired) |
| Shutdown | `CancellationToken` | (planned) | Unified graceful shutdown (planned: auto-wired) |

**Rule:** When adding ANY new module or feature to rustlib:
1. If it has configurable behaviour → load from cascade via `from_cascade()`
2. If it does I/O or processing → add `#[cfg(feature = "metrics")]` counters/gauges/histograms
3. If it can fail or has interesting state → add `tracing::` log calls
4. If it affects service health → report into unified `HealthState`

**The goal:** A DFE app that does `MetricsManager::new("dfe_loader")` +
`logger::setup_default()` + `config::setup()` at startup gets full
observability across every rustlib feature it uses — transport, tiered-sink,
spool, cache, secrets, HTTP client, DLQ — with zero additional wiring.

---

## Decisions

- **Dynamic linking for C deps** — rdkafka, libgit2, zstd, zlib, openssl all link against system libs via pkg-config. Eliminates ~30min C++ build for rdkafka. aws-lc-sys is the one exception (AWS SDK hardcodes it, no opt-out).
Expand Down
184 changes: 56 additions & 128 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,164 +1,92 @@
# TODO - hyperi-rustlib

**Project Goal:** Rust shared library equivalent to hyperi-pylib (Python) and hyperi-golib (Go)
**Project Goal:** Opinionated Rust framework for high-throughput data pipelines at PB scale

**Target:** Production-ready library for HyperI Rust applications
**Target:** Production-ready library with auto-wiring config, logging, metrics, tracing, health, and graceful shutdown

---

## Current Tasks

### Kafka Transport Metrics Parity with gRPC `[NEXT]`
### v2.0.0 Release `[NEXT]`

gRPC transport (v1.19.7) auto-emits `dfe_transport_*` metrics. Kafka transport does not — two gaps:
All core pillar work is done. Need to:
- [ ] Release-merge to release branch (feat!: breaking change → v2.0.0)
- [ ] Verify crates.io publication
- [ ] Docs consolidation (TRANSPORT.md, CORE-PILLARS.md, per-feature docs)
- [ ] Add Redis vs Kafka comparison table to transport docs

1. **Add metrics instrumentation to `KafkaTransport::send()`** — emit `dfe_transport_sent_total{transport="kafka"}`, `dfe_transport_send_errors_total{transport="kafka"}`, `dfe_transport_backpressured_total{transport="kafka"}`, and send duration histogram, matching gRPC parity.
### DLQ Transport Integration

2. **Wire `StatsContext` into `KafkaTransport`** — currently `new_with_context()` is a stub that ignores the context. The struct uses `DefaultConsumerContext` / plain `FutureProducer`, so `statistics.interval.ms` callbacks (set to 1000ms by all profiles) go to a no-op. Need to either make the struct generic over context (complicates `Transport` trait impl) or use `StatsContext` by default when the `metrics` feature is enabled. This enables `rdkafka_broker_rtt_avg_seconds`, `rdkafka_global_msg_cnt`, `rdkafka_topic_partition_consumer_lag`, etc.
- [ ] DLQ Kafka backend uses `Box<dyn TransportSender>` / `AnySender` instead of raw producer
- [ ] DLQ can write to any transport (file, HTTP, Redis, Kafka)

Downstream impact: dfe-fetcher, dfe-receiver, dfe-loader all use `KafkaTransport` and would get these for free once wired.
### Identity / Auth Module (Discussion)

### Gap Analysis P2 — HTTP Client, Database URLs, Cache
- [ ] Token validation middleware (JWT/OIDC) for gRPC interceptor + axum middleware
- [ ] Service identity (service name + instance ID for mTLS, audit logs)
- [ ] Break-glass: static bearer token from secrets module
- [ ] Design decision: dfe-engine as SSoT, rustlib validates tokens only

- [ ] HTTP client module with retry middleware (reqwest + reqwest-middleware + reqwest-retry)
- Wrap reqwest with exponential backoff, configurable timeouts
- Auto-register config via `unmarshal_key_registered`
- Metrics integration (request count, duration, errors)
- [ ] Database URL builders (PostgreSQL, ClickHouse, Redis)
- Build connection strings from env vars with standard prefixes
- `SensitiveString` for password fields
- [ ] Cache module with disk/memory backends
- Consolidate secrets cache pattern into reusable module
- TTL, stale-while-revalidate, size bounds
### Downstream Remediation

### Completed Recent

- [x] **Config registry** (v1.19.3-v1.19.5) — auto-registering reflectable config, `/config` admin endpoint, `SensitiveString`, heuristic redaction, change notification, `ConfigReloader` hook
- [x] **CEL expression profile** (v1.19.2) — `matches()` blocked by default, `ProfileConfig` with per-category overrides, string literal false-positive prevention
- [x] **Config cascade wiring** (v1.19.2) — expression, memory, version_check, scaling, grpc, secrets auto-read from figment cascade
- [x] **MemoryGuard underflow fix** (v1.19.1) — `fetch_sub` replaced with `fetch_update` + `saturating_sub`
- [x] **Test restructure** (v1.19.1) — `tests/integration/`, `tests/e2e/`, `tests/common/` per testing standard
- [x] **hyperi-ci release-merge** — CLI command replaces per-project workflow files
- [x] **Rust edition 2024** — migrated from 2021; `temp-env` replaces unsafe `set_var`/`remove_var` in tests across 6 files
- [x] **async-trait removal** — public traits (`Sink`, `Transport`, `SecretProvider`) now use `fn ... -> impl Future + Send` (Rust 1.75+ native)
- [x] **kafka_config module** — `config_from_file`, 7 named profiles, `merge_with_overrides`; librdkafka settings loaded from config git dir (only cascade exception)
- [x] **File output sink** — `src/io/`, `src/output/`, `output-file` feature
- [x] **CLI module** — CommonArgs, StandardCommand, DfeApp trait (`cli` feature)
- [x] **Top module** — ratatui TUI dashboard, Prometheus parser, oneshot mode (`top` feature)
- [x] **CI gating fix** — Semantic Release now gated on CI success via workflow_run
- [ ] Migrate dfe-loader to v2.0.0 (transport factory, remove boilerplate)
- [ ] Migrate dfe-receiver to v2.0.0 (RoutedSender, transport factory)
- [ ] Migrate dfe-archiver to v2.0.0
- [ ] Migrate dfe-fetcher to v2.0.0
- [ ] Migrate dfe-transform-wasm to v2.0.0
- [ ] Migrate dfe-transform-vrl to v2.0.0
- [ ] Audit hyperi-pylib and write alignment plan

---

## Completed

- [x] Vector compat integration tests — 6 tests using real Vector binary + VectorCompatClient (fetch-vector.sh + YAML)
- [x] vault_env integration tests fixed — clear_vault_env() prevents VAULT_TOKEN leakage
- [x] Dependency update sweep — all crates to latest, tonic/prost 0.14 migration (v1.8.4)
- [x] Stale hs-rustlib removed from JFrog hypersec-cargo-local and hyperi-cargo-local
- [x] MaskingLayer fixed — writer-based redaction for both JSON and text formats (v1.8.4)
- [x] Logger output capturing tests — 10 tests (JSON, text, filtering, masking)
- [x] Coloured log output — custom FormatEvent with owo-colors colour scheme
- [x] Metrics graceful shutdown tests — 4 tests (shutdown, rapid cycle, render after stop, concurrent)
- [x] gRPC transport integration tests — 8 tests (send/recv, ordering, large payload, compression)
- [x] gRPC transport with Vector wire protocol compatibility (v1.8.0)
- tonic-based gRPC replacing Zenoh transport
- DFE native proto (`dfe.transport.v1`) + vendored Vector proto
- Vector compat source/sink for migration from Vector pipelines
- build.rs for conditional proto code generation
- [x] Zenoh transport removed — replaced by gRPC (v1.8.0)
- [x] Version check module — startup check against releases.hyperi.io (v1.7.0)
- [x] Deployment validation module — Helm chart and Dockerfile contract checks (v1.7.0)
- [x] CI: ARC self-hosted runners enabled (v1.7.1–v1.8.3)
- [x] Clippy/formatting fixes — approx_constant lint, dprint float formatting (v1.8.1–v1.8.3)
- [x] Package rename: hs-rustlib -> hyperi-rustlib, published v1.4.3 to JFrog
- [x] Rebrand: HyperSec -> HyperI across source, docs, configs, workflows
- [x] Registry migration: hypersec registry -> hyperi registry
- [x] Submodule URLs: hypersec-io -> hyperi-io
- [x] CI config: .hypersec-ci.yaml -> .hyperi-ci.yaml
- [x] Directory-config store with git2 integration (v1.4.0)
- [x] OpenTelemetry metrics support (v1.4.0)
- [x] Secrets management module (OpenBao/Vault, AWS) (v1.3.x)
- [x] HTTP server module (axum-based) (v1.2.0)
- [x] Transport module (Kafka/Memory abstraction)
- [x] TieredSink module (disk spillover with circuit breaker)
- [x] Spool module (disk-backed queue)
- [x] Configuration module (7-layer cascade with figment)
- [x] Logger module (structured JSON, RFC3339, masking)
- [x] Metrics module (Prometheus + process/container)
- [x] Environment detection module
- [x] Runtime paths module (XDG + container awareness)
- [x] Dependency audit (serde_yml -> serde-yaml-ng, queue-file -> yaque, once_cell -> LazyLock)
- [x] Config cascade alignment with hyperi-pylib unified spec (v1.6.0)
- load_home_dotenv default false, app_name support, container/user config paths
- Created docs/CONFIG-CASCADE.md
- PG layer documented as built-for-not-with (YAML gitops covers current needs)
### Completed This Session

- [x] **Transport trait split** — `Transport` split into `TransportBase` + `TransportSender` + `TransportReceiver` with blanket `Transport` impl
- [x] **Transport factory** — `AnySender` enum dispatch from config, `RoutedSender` for per-key dispatch (receiver/fetcher only)
- [x] **File transport** — NDJSON with position tracking, commit persistence, rotation
- [x] **Pipe transport** — stdin/stdout for Unix pipeline composition
- [x] **HTTP transport** — POST send + embedded axum receive (bidirectional)
- [x] **Redis/Valkey Streams transport** — XADD/XREADGROUP/XACK with consumer groups
- [x] **HealthRegistry** — global singleton, modules auto-register health check closures, `/readyz` aggregates, `/health/detailed` JSON
- [x] **Shutdown manager** — global CancellationToken, SIGTERM/SIGINT handler, modules listen on token
- [x] **OTel trace propagation** — W3C traceparent auto-injected/extracted in gRPC, Kafka, HTTP transports
- [x] **Universal metrics** — all modules auto-emit Prometheus metrics via global recorder
- [x] **Logging + config cascade** — added to all new transports
- [x] **Health wiring** — Kafka, gRPC, CircuitBreaker, HttpServer auto-register
- [x] **Shutdown wiring** — HttpServer, TieredSink drainer, ConfigReloader listen on global token
- [x] **KafkaTransport StatsContext** — always-on, rdkafka_* metrics auto-emitted
- [x] **gRPC transport metrics** — dfe_transport_* parity with Kafka
- [x] **HTTP client, database URL builders, cache modules** (v1.19.6)
- [x] **Config registry, SensitiveString, /config endpoint** (v1.19.3-v1.19.5)
- [x] **Dependency updates, cargo-audit ignores** (v1.19.6)

---

## Backlog (P1 - Config Registry)

### Reflectable Config Registry

Central registry where every module registers its config section at startup.
Currently modules independently call `unmarshal_key()` — no visibility into
what config keys exist, their types, defaults, or descriptions.

**Goal:** Any DFE app can list/dump/expose all available config sections.

- [x] Auto-registration via `unmarshal_key_registered` — records `(key, type_name, defaults, effective)` in global registry. Zero code changes in downstream apps.
- [x] `registry::sections()` — list all registered sections
- [x] `registry::dump_effective()` — JSON map of effective values
- [x] `registry::dump_defaults()` — JSON map of defaults (via `T::default()`)
- [x] Heuristic auto-redaction (password, secret, token, key, credential, auth, private, cert, encryption)
- [x] `#[serde(skip_serializing)]` as additional layer for fields that should never appear
- [x] expression, memory, version_check, scaling, grpc, secrets wired with `from_cascade()` auto-register
- [x] Modules without defaults (tiered_sink, http_server, kafka, spool, dlq) use `unmarshal_key_registered` from downstream apps
- [x] `/config` admin endpoint (opt-in via `enable_config_endpoint`) — returns redacted effective + defaults JSON
- [x] Change notification (opt-in) — `registry::on_change(key, callback)` + `registry::update()`
- Modules that need hot-reload subscribe; others keep `OnceLock` (init-once)
- [x] `ConfigReloader.with_registry_update(key)` connects hot-reload to registry
- [x] `SensitiveString` type — compile-time safe, `Serialize` always redacts
- [x] 19 registry + 12 sensitive string tests covering all redaction guarantees
- [ ] Migrate all dfe-* and hyperi-* apps to `unmarshal_key_registered` pattern
- [ ] Align hyperi-pylib with same registry pattern

---

## Backlog (P2 - from Gap Analysis)

### Phase 1 - Core Enterprise

- [ ] Database URL builders module (PostgreSQL, Redis)
- [ ] HTTP client module with retry middleware (reqwest-retry)
## Backlog

### Secrets Providers

- [ ] GCP Secret Manager provider (`secrets-gcp` feature, `google-cloud-secretmanager` crate)
- [ ] Azure Key Vault provider (`secrets-azure` feature, `azure_security_keyvault` crate)
- [ ] GCP Secret Manager provider (`secrets-gcp` feature)
- [ ] Azure Key Vault provider (`secrets-azure` feature)

### Phase 2 - Enhanced Features
### Kafka — Opinionated SASL-SCRAM Named Constructors

- [ ] Cache module with disk/Redis backing
- [ ] CLI framework helpers (wrap Clap)
- [ ] `KafkaConfig::external_sasl_scram(brokers, username, password)`
- [ ] `KafkaConfig::internal_sasl_scram(brokers, username, password)`

### Phase 3 - Advanced
### Other

- [ ] Standalone Kafka client (if transport layer insufficient)
- [ ] PII anonymiser (evaluate Rust libraries)
- [ ] Python bindings for ClickHouse client (PyO3)

### Kafka — Opinionated SASL-SCRAM Named Constructors

- [ ] Add `KafkaConfig::external_sasl_scram(brokers, username, password)` — SASL_SSL + SCRAM-SHA-512
- [ ] Add `KafkaConfig::internal_sasl_scram(brokers, username, password)` — SASL_PLAINTEXT + SCRAM-SHA-512
- [ ] Encodes the decision once: SCRAM works unchanged on Apache Kafka, AutoMQ, MSK, Confluent Cloud
- [ ] Remove per-project manual assembly of protocol + sasl + tls fields in dfe-loader, dfe-receiver

---

## Notes

- Use `CARGO_BUILD_JOBS=2` for all cargo commands
- Transport backends: Kafka, gRPC (native + Vector compat), Memory (Zenoh removed in v1.8.0)
- See docs/GAP_ANALYSIS.md for detailed comparison with hyperi-pylib
- See docs/CLICKHOUSE_PYTHON_BINDINGS.md for Python binding proposal
- Transport backends: Kafka, gRPC, Memory, File, Pipe, HTTP, Redis/Valkey
- Core pillars plan: `docs/superpowers/plans/2026-03-26-core-pillars.md`
- Two deployment modes: Kafka-mediated (persistence) vs direct gRPC (low latency)
- Routed transport is receiver/fetcher only — all other stages are 1:1
- Breaking change: `feat!:` commit triggers v2.0.0 via semantic-release
Loading