Skip to content

Commit 36c383d

Browse files
committed
feat!: split Transport trait and add 4 new transports + factory
BREAKING CHANGE: Transport trait split into TransportBase (close, is_healthy, name), TransportSender (send), and TransportReceiver (recv, commit, Token). Blanket Transport impl for types with both. New transport backends: - File: NDJSON with position tracking and commit persistence - Pipe: stdin/stdout for Unix pipeline composition - HTTP: POST to endpoint (send) + embedded axum server (receive) - Redis/Valkey Streams: XADD/XREADGROUP/XACK with consumer groups Transport factory: - AnySender: enum dispatch for runtime transport selection - AnySender::from_config(): create sender from config cascade - RoutedSender: per-key dispatch for data originators (receiver/fetcher) All transports auto-emit dfe_transport_* Prometheus metrics. 648 tests pass.
1 parent aa2afbc commit 36c383d

16 files changed

Lines changed: 2828 additions & 144 deletions

File tree

Cargo.toml

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,11 @@ transport-memory = ["transport"]
9191
transport-kafka = ["transport", "rdkafka"]
9292
transport-grpc = ["transport", "dep:tonic", "dep:tonic-prost", "dep:prost", "dep:prost-types", "dep:tonic-prost-build", "dep:prost-build"]
9393
transport-grpc-vector-compat = ["transport-grpc"]
94-
transport-all = ["transport-memory", "transport-kafka", "transport-grpc"]
94+
transport-file = ["transport", "io"]
95+
transport-pipe = ["transport"]
96+
transport-http = ["transport", "http"]
97+
transport-redis = ["transport", "redis"]
98+
transport-all = ["transport-memory", "transport-kafka", "transport-grpc", "transport-file", "transport-pipe", "transport-http", "transport-redis"]
9599

96100
# Secrets management
97101
secrets = ["tokio", "serde_json", "async-trait", "parking_lot", "base64", "dirs", "tracing"]
@@ -155,7 +159,7 @@ metrics-exporter-opentelemetry = { version = ">=0.2.1, <0.3", optional = true }
155159
sysinfo = { version = ">=0.38.0, <0.39", optional = true }
156160

157161
# Async runtime (for metrics server, http-server)
158-
tokio = { version = ">=1.50.0, <2", features = ["rt-multi-thread", "net", "sync", "time", "macros", "signal", "fs"], optional = true }
162+
tokio = { version = ">=1.50.0, <2", features = ["rt-multi-thread", "net", "sync", "time", "macros", "signal", "fs", "io-std", "io-util"], optional = true }
159163

160164
# HTTP client — pinned to reqwest 0.12 until vaultrs and opentelemetry-otlp
161165
# support 0.13. reqwest-middleware 0.4 and reqwest-retry 0.7 target 0.12.
@@ -183,6 +187,9 @@ rmp-serde = { version = ">=1.3.1, <2", optional = true }
183187
# Kafka transport (dynamic-linking: use system librdkafka instead of compiling C++ from source)
184188
rdkafka = { version = ">=0.39.0, <0.40", features = ["dynamic-linking"], optional = true }
185189

190+
# Redis/Valkey Streams transport
191+
redis = { version = ">=1.0, <2", features = ["tokio-comp", "streams"], optional = true }
192+
186193
# gRPC transport (tonic + prost)
187194
tonic = { version = ">=0.14, <0.15", features = ["gzip"], optional = true }
188195
tonic-prost = { version = ">=0.14.5, <0.15", optional = true }

src/transport/factory.rs

Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
// Project: hyperi-rustlib
2+
// File: src/transport/factory.rs
3+
// Purpose: Transport factory — create senders from config
4+
// Language: Rust
5+
//
6+
// License: FSL-1.1-ALv2
7+
// Copyright: (c) 2026 HYPERI PTY LIMITED
8+
9+
//! Transport factory for runtime transport selection.
10+
//!
11+
//! Creates transport senders from configuration, enabling apps to swap
12+
//! between Kafka, gRPC, file, pipe, HTTP, or Redis via config change.
13+
//!
14+
//! # Usage
15+
//!
16+
//! ```yaml
17+
//! # settings.yaml
18+
//! transport:
19+
//! output:
20+
//! type: kafka
21+
//! kafka:
22+
//! brokers: ["kafka:9092"]
23+
//! ```
24+
//!
25+
//! ```rust,ignore
26+
//! use hyperi_rustlib::transport::factory::AnySender;
27+
//!
28+
//! let sender = AnySender::from_config("transport.output").await?;
29+
//! sender.send("events.land", payload).await;
30+
//! ```
31+
32+
use super::error::{TransportError, TransportResult};
33+
use super::traits::{TransportBase, TransportSender};
34+
use super::types::{SendResult, TransportType};
35+
36+
/// Type-erased transport sender.
37+
///
38+
/// Wraps any concrete transport sender behind an enum for runtime
39+
/// dispatch. Created by the transport factory from config.
40+
///
41+
/// Uses enum dispatch (not trait objects) because `TransportSender`
42+
/// has `impl Future` return types which prevent `dyn` dispatch.
43+
pub enum AnySender {
44+
#[cfg(feature = "transport-kafka")]
45+
Kafka(super::kafka::KafkaTransport),
46+
47+
#[cfg(feature = "transport-grpc")]
48+
Grpc(super::grpc::GrpcTransport),
49+
50+
#[cfg(feature = "transport-memory")]
51+
Memory(super::memory::MemoryTransport),
52+
53+
#[cfg(feature = "transport-pipe")]
54+
Pipe(super::pipe::PipeTransport),
55+
56+
#[cfg(feature = "transport-file")]
57+
File(super::file::FileTransport),
58+
59+
#[cfg(feature = "transport-http")]
60+
Http(super::http::HttpTransport),
61+
62+
#[cfg(feature = "transport-redis")]
63+
Redis(super::redis_transport::RedisTransport),
64+
}
65+
66+
impl TransportBase for AnySender {
67+
async fn close(&self) -> TransportResult<()> {
68+
match self {
69+
#[cfg(feature = "transport-kafka")]
70+
Self::Kafka(t) => t.close().await,
71+
#[cfg(feature = "transport-grpc")]
72+
Self::Grpc(t) => t.close().await,
73+
#[cfg(feature = "transport-memory")]
74+
Self::Memory(t) => t.close().await,
75+
#[cfg(feature = "transport-pipe")]
76+
Self::Pipe(t) => t.close().await,
77+
#[cfg(feature = "transport-file")]
78+
Self::File(t) => t.close().await,
79+
#[cfg(feature = "transport-http")]
80+
Self::Http(t) => t.close().await,
81+
#[cfg(feature = "transport-redis")]
82+
Self::Redis(t) => t.close().await,
83+
}
84+
}
85+
86+
fn is_healthy(&self) -> bool {
87+
match self {
88+
#[cfg(feature = "transport-kafka")]
89+
Self::Kafka(t) => t.is_healthy(),
90+
#[cfg(feature = "transport-grpc")]
91+
Self::Grpc(t) => t.is_healthy(),
92+
#[cfg(feature = "transport-memory")]
93+
Self::Memory(t) => t.is_healthy(),
94+
#[cfg(feature = "transport-pipe")]
95+
Self::Pipe(t) => t.is_healthy(),
96+
#[cfg(feature = "transport-file")]
97+
Self::File(t) => t.is_healthy(),
98+
#[cfg(feature = "transport-http")]
99+
Self::Http(t) => t.is_healthy(),
100+
#[cfg(feature = "transport-redis")]
101+
Self::Redis(t) => t.is_healthy(),
102+
}
103+
}
104+
105+
fn name(&self) -> &'static str {
106+
match self {
107+
#[cfg(feature = "transport-kafka")]
108+
Self::Kafka(t) => t.name(),
109+
#[cfg(feature = "transport-grpc")]
110+
Self::Grpc(t) => t.name(),
111+
#[cfg(feature = "transport-memory")]
112+
Self::Memory(t) => t.name(),
113+
#[cfg(feature = "transport-pipe")]
114+
Self::Pipe(t) => t.name(),
115+
#[cfg(feature = "transport-file")]
116+
Self::File(t) => t.name(),
117+
#[cfg(feature = "transport-http")]
118+
Self::Http(t) => t.name(),
119+
#[cfg(feature = "transport-redis")]
120+
Self::Redis(t) => t.name(),
121+
}
122+
}
123+
}
124+
125+
impl TransportSender for AnySender {
126+
async fn send(&self, key: &str, payload: &[u8]) -> SendResult {
127+
match self {
128+
#[cfg(feature = "transport-kafka")]
129+
Self::Kafka(t) => t.send(key, payload).await,
130+
#[cfg(feature = "transport-grpc")]
131+
Self::Grpc(t) => t.send(key, payload).await,
132+
#[cfg(feature = "transport-memory")]
133+
Self::Memory(t) => t.send(key, payload).await,
134+
#[cfg(feature = "transport-pipe")]
135+
Self::Pipe(t) => t.send(key, payload).await,
136+
#[cfg(feature = "transport-file")]
137+
Self::File(t) => t.send(key, payload).await,
138+
#[cfg(feature = "transport-http")]
139+
Self::Http(t) => t.send(key, payload).await,
140+
#[cfg(feature = "transport-redis")]
141+
Self::Redis(t) => t.send(key, payload).await,
142+
}
143+
}
144+
}
145+
146+
impl AnySender {
147+
/// Create a sender from config cascade.
148+
///
149+
/// Reads the transport config from the given key in the config
150+
/// cascade and creates the appropriate sender.
151+
///
152+
/// # Example config
153+
///
154+
/// ```yaml
155+
/// transport:
156+
/// output:
157+
/// type: kafka
158+
/// kafka:
159+
/// brokers: ["kafka:9092"]
160+
/// ```
161+
///
162+
/// ```rust,ignore
163+
/// let sender = AnySender::from_config("transport.output").await?;
164+
/// ```
165+
pub async fn from_config(key: &str) -> TransportResult<Self> {
166+
#[cfg(feature = "config")]
167+
let config = {
168+
let cfg = crate::config::try_get()
169+
.ok_or_else(|| TransportError::Config("config not initialised".into()))?;
170+
cfg.unmarshal_key::<super::TransportConfig>(key)
171+
.map_err(|e| TransportError::Config(format!("failed to read {key}: {e}")))?
172+
};
173+
174+
#[cfg(not(feature = "config"))]
175+
let config = super::TransportConfig::default();
176+
177+
Self::from_transport_config(&config).await
178+
}
179+
180+
/// Create a sender from an explicit `TransportConfig`.
181+
pub async fn from_transport_config(config: &super::TransportConfig) -> TransportResult<Self> {
182+
match config.transport_type {
183+
#[cfg(feature = "transport-kafka")]
184+
TransportType::Kafka => {
185+
let kafka_config = config
186+
.kafka
187+
.as_ref()
188+
.ok_or_else(|| TransportError::Config("kafka config missing".into()))?;
189+
let transport = super::kafka::KafkaTransport::new(kafka_config).await?;
190+
Ok(Self::Kafka(transport))
191+
}
192+
193+
#[cfg(feature = "transport-grpc")]
194+
TransportType::Grpc => {
195+
let grpc_config = config
196+
.grpc
197+
.as_ref()
198+
.ok_or_else(|| TransportError::Config("grpc config missing".into()))?;
199+
let transport = super::grpc::GrpcTransport::new(grpc_config).await?;
200+
Ok(Self::Grpc(transport))
201+
}
202+
203+
#[cfg(feature = "transport-memory")]
204+
TransportType::Memory => {
205+
let memory_config = config.memory.clone().unwrap_or_default();
206+
let transport = super::memory::MemoryTransport::new(&memory_config);
207+
Ok(Self::Memory(transport))
208+
}
209+
210+
#[cfg(feature = "transport-pipe")]
211+
TransportType::Pipe => {
212+
let pipe_config = config.pipe.clone().unwrap_or_default();
213+
let transport = super::pipe::PipeTransport::new(&pipe_config);
214+
Ok(Self::Pipe(transport))
215+
}
216+
217+
#[cfg(feature = "transport-file")]
218+
TransportType::File => {
219+
let file_config = config
220+
.file
221+
.as_ref()
222+
.ok_or_else(|| TransportError::Config("file config missing".into()))?;
223+
let transport = super::file::FileTransport::new(file_config).await?;
224+
Ok(Self::File(transport))
225+
}
226+
227+
#[cfg(feature = "transport-http")]
228+
TransportType::Http => {
229+
let http_config = config
230+
.http
231+
.as_ref()
232+
.ok_or_else(|| TransportError::Config("http config missing".into()))?;
233+
let transport = super::http::HttpTransport::new(http_config).await?;
234+
Ok(Self::Http(transport))
235+
}
236+
237+
#[cfg(feature = "transport-redis")]
238+
TransportType::Redis => {
239+
let redis_config = config
240+
.redis
241+
.as_ref()
242+
.ok_or_else(|| TransportError::Config("redis config missing".into()))?;
243+
let transport = super::redis_transport::RedisTransport::new(redis_config).await?;
244+
Ok(Self::Redis(transport))
245+
}
246+
247+
// Transport types for modules not yet implemented
248+
#[allow(unreachable_patterns)]
249+
other => Err(TransportError::Config(format!(
250+
"transport type '{other}' is not available (feature not enabled or not yet implemented)"
251+
))),
252+
}
253+
}
254+
}

0 commit comments

Comments
 (0)