Skip to content

Commit 0f592ca

Browse files
committed
fix: introduce concurrency primitives; reshape DLQ around BackgroundSink
New `concurrency` feature: BackgroundSink<T>, PeriodicWorker, ActorHandle<Cmd>. DLQ now uses BackgroundSink<DlqEntry> with enum-dispatch backends; cascade moves into the drain task. AsyncNdjsonWriter offloads sync rotate-and-write to spawn_blocking; FileOutput gains async API. config/reloader, version_check, directory_config sync-I/O-on-async fixed. /tmp/hyperi fallback removed. clippy::await_holding_lock denied crate-wide. New tests/sync_in_async.rs grep lint (0 violations) and benches/concurrency_patterns.rs (try_push 28-35ns, try_send 63ns). 1280 tests pass. Downstream apps migrate: Dlq::file_only / Dlq::with_kafka -> Dlq::spawn( config, name, kafka_opt, shutdown). FileDlq / KafkaDlq / HttpDlq / RedisDlq types now crate-private. Dlq::send.await returns when queued; use Dlq::flush.await for durability. Dlq::add_backend removed. Publish: true
1 parent 9af33d9 commit 0f592ca

26 files changed

Lines changed: 3250 additions & 832 deletions

File tree

Cargo.toml

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,12 @@ runtime = ["dirs"]
3636
config = ["figment", "dotenvy", "serde_yaml_ng", "serde_json", "dirs", "tracing"]
3737
logger = ["tracing", "tracing-subscriber", "owo-colors", "serde_json", "tracing-throttle"]
3838

39+
# Three generic async primitives (BackgroundSink, PeriodicWorker, ActorHandle).
40+
# Foundational helpers for any module that does fire-and-forget durable
41+
# writes, timer-driven loops, or stateful command-queue actors. See
42+
# `src/concurrency/mod.rs` for the decision matrix.
43+
concurrency = ["tokio", "tokio-util", "tracing", "metrics-core"]
44+
3945
# Metrics tiers (2.6.0 split). Consumers pay only for what they use:
4046
# - metrics-core: emit-only (counter!/gauge!/histogram! macros work)
4147
# - metrics-process: + cgroup-aware process gauges
@@ -99,10 +105,10 @@ cli-service = ["cli", "metrics", "memory", "scaling", "worker-pool", "shutdown"]
99105
top = ["cli-service", "ratatui"]
100106

101107
# Shared NDJSON file I/O (used by dlq + output-file)
102-
io = ["file-rotate", "parking_lot", "chrono", "tracing"]
108+
io = ["file-rotate", "parking_lot", "chrono", "tracing", "tokio"]
103109

104110
# Dead letter queue (file backend always available)
105-
dlq = ["io", "base64", "serde_json", "tokio", "async-trait", "tracing"]
111+
dlq = ["io", "concurrency", "base64", "serde_json", "tokio", "tokio-util", "tracing"]
106112
dlq-kafka = ["dlq", "transport-kafka"]
107113
dlq-http = ["dlq", "reqwest"]
108114
dlq-redis = ["dlq", "transport-redis"]
@@ -358,3 +364,8 @@ required-features = ["worker"]
358364
name = "filter_benchmark"
359365
harness = false
360366
required-features = ["transport-memory"]
367+
368+
[[bench]]
369+
name = "concurrency_patterns"
370+
harness = false
371+
required-features = ["concurrency"]

benches/concurrency_patterns.rs

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
// Project: hyperi-rustlib
2+
// File: benches/concurrency_patterns.rs
3+
// Purpose: Criterion benchmarks for the three async primitives
4+
// Language: Rust
5+
//
6+
// License: FSL-1.1-ALv2
7+
// Copyright: (c) 2026 HYPERI PTY LIMITED
8+
9+
//! Benchmarks for `concurrency::BackgroundSink`, `PeriodicWorker`,
10+
//! `ActorHandle`.
11+
//!
12+
//! Validates the design assumptions:
13+
//!
14+
//! - `BackgroundSink::try_push` is ~100 ns happy path (the consumer hot
15+
//! path target — slower than this means we've regressed the headline
16+
//! guarantee).
17+
//! - `ActorHandle::try_send` is in the same ballpark (~100 ns).
18+
//! - Both stay below 500 ns p99 under contention from many concurrent
19+
//! producers.
20+
//!
21+
//! Run with `cargo bench --bench concurrency_patterns --features concurrency`.
22+
23+
use std::sync::Arc;
24+
use std::sync::atomic::{AtomicU64, Ordering};
25+
use std::time::Duration;
26+
27+
use criterion::{Criterion, Throughput, criterion_group, criterion_main};
28+
use tokio::runtime::Runtime;
29+
use tokio_util::sync::CancellationToken;
30+
31+
use hyperi_rustlib::concurrency::{
32+
Actor, ActorConfig, ActorHandle, BackgroundSink, BackgroundSinkConfig, DrainError, Overflow,
33+
SinkDrain,
34+
};
35+
36+
/// Drain that counts incoming messages atomically. Trivial — keeps the
37+
/// bench focused on the producer hot path, not the drain.
38+
struct CountingDrain {
39+
count: Arc<AtomicU64>,
40+
}
41+
42+
impl SinkDrain<u64> for CountingDrain {
43+
async fn write_batch(&mut self, batch: Vec<u64>) -> Result<(), DrainError> {
44+
self.count.fetch_add(batch.len() as u64, Ordering::Relaxed);
45+
Ok(())
46+
}
47+
}
48+
49+
/// Counter actor. Sums each pushed value.
50+
struct CounterActor {
51+
sum: u64,
52+
}
53+
54+
impl Actor for CounterActor {
55+
type Command = u64;
56+
async fn handle(&mut self, cmd: u64) {
57+
self.sum = self.sum.wrapping_add(cmd);
58+
}
59+
}
60+
61+
fn bench_background_sink_try_push(c: &mut Criterion) {
62+
let rt = Runtime::new().expect("tokio rt");
63+
let mut group = c.benchmark_group("background_sink");
64+
group.throughput(Throughput::Elements(1));
65+
66+
group.bench_function("try_push_drop_mode", |b| {
67+
let count = Arc::new(AtomicU64::new(0));
68+
let shutdown = CancellationToken::new();
69+
let (sink, _handle) = rt.block_on(async {
70+
BackgroundSink::spawn(
71+
CountingDrain {
72+
count: count.clone(),
73+
},
74+
BackgroundSinkConfig {
75+
queue_capacity: 1_000_000,
76+
batch_size: 4096,
77+
flush_interval: Duration::from_millis(50),
78+
overflow: Overflow::Drop,
79+
metric_prefix: None,
80+
},
81+
shutdown.clone(),
82+
)
83+
});
84+
85+
b.iter(|| {
86+
let _ = std::hint::black_box(sink.try_push(42));
87+
});
88+
89+
shutdown.cancel();
90+
});
91+
92+
group.finish();
93+
}
94+
95+
fn bench_actor_try_send(c: &mut Criterion) {
96+
let rt = Runtime::new().expect("tokio rt");
97+
let mut group = c.benchmark_group("actor");
98+
group.throughput(Throughput::Elements(1));
99+
100+
group.bench_function("try_send_unsaturated", |b| {
101+
let shutdown = CancellationToken::new();
102+
let (handle, _join) = rt.block_on(async {
103+
ActorHandle::spawn(
104+
CounterActor { sum: 0 },
105+
ActorConfig {
106+
queue_capacity: 1_000_000,
107+
idle_interval: Duration::from_mins(1),
108+
},
109+
shutdown.clone(),
110+
)
111+
});
112+
113+
b.iter(|| {
114+
let _ = std::hint::black_box(handle.try_send(42));
115+
});
116+
117+
shutdown.cancel();
118+
});
119+
120+
group.finish();
121+
}
122+
123+
criterion_group!(
124+
benches,
125+
bench_background_sink_try_push,
126+
bench_actor_try_send
127+
);
128+
criterion_main!(benches);

0 commit comments

Comments
 (0)