Skip to content

Commit 311af18

Browse files
committed
fix: add BatchAccumulator for bounded batch drain with time/count/bytes thresholds
1 parent 00fe7f5 commit 311af18

3 files changed

Lines changed: 384 additions & 2 deletions

File tree

src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,8 +237,9 @@ pub use scaling::{
237237

238238
#[cfg(feature = "worker")]
239239
pub use worker::{
240-
AdaptiveWorkerPool, BatchPipeline, BatchProcessor, PipelineStats, PipelineStatsSnapshot,
241-
ScalingDecision, ScalingInput, WorkerPoolConfig,
240+
AccumulatorConfig, AccumulatorFull, AdaptiveWorkerPool, BatchAccumulator, BatchDrainer,
241+
BatchPipeline, BatchProcessor, PipelineStats, PipelineStatsSnapshot, ScalingDecision,
242+
ScalingInput, WorkerPoolConfig,
242243
};
243244

244245
#[cfg(feature = "cli")]

src/worker/accumulator.rs

Lines changed: 379 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,379 @@
1+
// Project: hyperi-rustlib
2+
// File: src/worker/accumulator.rs
3+
// Purpose: Bounded batch accumulator with time/count/bytes drain thresholds
4+
// Language: Rust
5+
//
6+
// License: FSL-1.1-ALv2
7+
// Copyright: (c) 2026 HYPERI PTY LIMITED
8+
9+
//! Bounded batch accumulator for DFE pipeline batching.
10+
//!
11+
//! Accumulates items from multiple producers (HTTP handlers, gRPC handlers, etc.)
12+
//! and drains them as batches when any threshold is met:
13+
//! - Item count reaches `max_items`
14+
//! - Byte count reaches `max_bytes`
15+
//! - Time since last drain reaches `max_wait`
16+
//!
17+
//! Bounded — pushers get an error when the channel is full (backpressure).
18+
//! Shutdown-safe — `drain_remaining()` flushes buffered items.
19+
//!
20+
//! ## Example
21+
//!
22+
//! ```rust,ignore
23+
//! use hyperi_rustlib::worker::BatchAccumulator;
24+
//! use std::time::Duration;
25+
//!
26+
//! let (acc, mut drainer) = BatchAccumulator::new(
27+
//! 1000, // channel capacity (backpressure bound)
28+
//! 100, // max items per batch
29+
//! 1024 * 1024, // max bytes per batch (1MB)
30+
//! Duration::from_millis(10), // max wait before flush
31+
//! );
32+
//!
33+
//! // Producers push (from HTTP handlers, etc.)
34+
//! acc.push(payload, payload.len()).await?;
35+
//!
36+
//! // Consumer drains batches (background task)
37+
//! loop {
38+
//! let batch = drainer.next_batch().await;
39+
//! if batch.is_empty() { break; } // shutdown
40+
//! process_batch(&batch);
41+
//! }
42+
//! ```
43+
44+
use std::time::Duration;
45+
46+
use tokio::sync::mpsc;
47+
48+
/// Accumulator configuration.
49+
#[derive(Debug, Clone)]
50+
pub struct AccumulatorConfig {
51+
/// Channel capacity (bounded — pushers get error when full).
52+
pub channel_capacity: usize,
53+
/// Maximum items per batch before auto-drain.
54+
pub max_items: usize,
55+
/// Maximum accumulated bytes per batch before auto-drain.
56+
pub max_bytes: usize,
57+
/// Maximum time since last drain before auto-flush.
58+
pub max_wait: Duration,
59+
}
60+
61+
impl Default for AccumulatorConfig {
62+
fn default() -> Self {
63+
Self {
64+
channel_capacity: 10_000,
65+
max_items: 100,
66+
max_bytes: 1024 * 1024, // 1MB
67+
max_wait: Duration::from_millis(10),
68+
}
69+
}
70+
}
71+
72+
/// Push handle — cloneable, used by producers to send items into the accumulator.
73+
#[derive(Clone)]
74+
pub struct BatchAccumulator<T> {
75+
tx: mpsc::Sender<(T, usize)>, // (item, byte_size)
76+
}
77+
78+
/// Drain handle — used by a single consumer to receive batches.
79+
pub struct BatchDrainer<T> {
80+
rx: mpsc::Receiver<(T, usize)>,
81+
config: AccumulatorConfig,
82+
buffer: Vec<T>,
83+
buffer_bytes: usize,
84+
}
85+
86+
/// Error when the accumulator channel is full (backpressure).
87+
#[derive(Debug, thiserror::Error)]
88+
#[error("accumulator full — backpressure active ({capacity} items buffered)")]
89+
pub struct AccumulatorFull {
90+
pub capacity: usize,
91+
}
92+
93+
impl<T: Send + 'static> BatchAccumulator<T> {
94+
/// Create a new accumulator + drainer pair.
95+
///
96+
/// Returns `(push_handle, drain_handle)`. The push handle is `Clone` for
97+
/// sharing across HTTP/gRPC handlers. The drain handle is used by a single
98+
/// background task to receive batches.
99+
#[must_use]
100+
pub fn new(config: AccumulatorConfig) -> (Self, BatchDrainer<T>) {
101+
let (tx, rx) = mpsc::channel(config.channel_capacity);
102+
let drainer = BatchDrainer {
103+
rx,
104+
buffer: Vec::with_capacity(config.max_items),
105+
buffer_bytes: 0,
106+
config: config.clone(),
107+
};
108+
(Self { tx }, drainer)
109+
}
110+
111+
/// Push an item into the accumulator.
112+
///
113+
/// `byte_size` is used for the bytes threshold. Pass `payload.len()`.
114+
///
115+
/// # Errors
116+
///
117+
/// Returns `AccumulatorFull` if the channel is at capacity (backpressure).
118+
pub async fn push(&self, item: T, byte_size: usize) -> Result<(), AccumulatorFull> {
119+
self.tx
120+
.try_send((item, byte_size))
121+
.map_err(|_| AccumulatorFull {
122+
capacity: self.tx.capacity(),
123+
})
124+
}
125+
126+
/// Check if the accumulator has been closed (drainer dropped).
127+
pub fn is_closed(&self) -> bool {
128+
self.tx.is_closed()
129+
}
130+
}
131+
132+
impl<T> BatchDrainer<T> {
133+
/// Wait for the next batch.
134+
///
135+
/// Blocks until any threshold is met (items, bytes, or time). Returns
136+
/// an empty vec when the channel is closed (all producers dropped = shutdown).
137+
pub async fn next_batch(&mut self) -> Vec<T> {
138+
// If buffer already meets a threshold, drain immediately
139+
if self.threshold_met() {
140+
return self.take_buffer();
141+
}
142+
143+
// Wait for items with a timeout
144+
loop {
145+
let timeout = tokio::time::sleep(self.config.max_wait);
146+
147+
tokio::select! {
148+
biased;
149+
150+
// Time threshold — flush whatever we have
151+
() = timeout => {
152+
if self.buffer.is_empty() {
153+
// No items at all — keep waiting (don't return empty batch)
154+
continue;
155+
}
156+
return self.take_buffer();
157+
}
158+
159+
// New item arrived
160+
item = self.rx.recv() => {
161+
match item {
162+
Some((val, size)) => {
163+
self.buffer_bytes += size;
164+
self.buffer.push(val);
165+
if self.threshold_met() {
166+
return self.take_buffer();
167+
}
168+
}
169+
None => {
170+
// Channel closed — drain remaining
171+
return self.take_buffer();
172+
}
173+
}
174+
}
175+
}
176+
}
177+
}
178+
179+
/// Drain any remaining buffered items (for graceful shutdown).
180+
pub fn drain_remaining(&mut self) -> Vec<T> {
181+
// Drain channel
182+
while let Ok((val, size)) = self.rx.try_recv() {
183+
self.buffer_bytes += size;
184+
self.buffer.push(val);
185+
}
186+
self.take_buffer()
187+
}
188+
189+
fn threshold_met(&self) -> bool {
190+
self.buffer.len() >= self.config.max_items || self.buffer_bytes >= self.config.max_bytes
191+
}
192+
193+
fn take_buffer(&mut self) -> Vec<T> {
194+
self.buffer_bytes = 0;
195+
std::mem::take(&mut self.buffer)
196+
}
197+
}
198+
199+
#[cfg(test)]
200+
mod tests {
201+
use super::*;
202+
203+
#[tokio::test]
204+
async fn test_drain_on_item_count() {
205+
let config = AccumulatorConfig {
206+
channel_capacity: 100,
207+
max_items: 5,
208+
max_bytes: usize::MAX,
209+
max_wait: Duration::from_secs(60), // won't trigger
210+
};
211+
let (acc, mut drainer) = BatchAccumulator::new(config);
212+
213+
// Push 5 items — should trigger drain
214+
for i in 0..5 {
215+
acc.push(i, 1).await.unwrap();
216+
}
217+
218+
let batch = drainer.next_batch().await;
219+
assert_eq!(batch.len(), 5);
220+
assert_eq!(batch, vec![0, 1, 2, 3, 4]);
221+
}
222+
223+
#[tokio::test]
224+
async fn test_drain_on_byte_threshold() {
225+
let config = AccumulatorConfig {
226+
channel_capacity: 100,
227+
max_items: 1000, // won't trigger
228+
max_bytes: 10, // trigger at 10 bytes
229+
max_wait: Duration::from_secs(60),
230+
};
231+
let (acc, mut drainer) = BatchAccumulator::new(config);
232+
233+
// Push items with size=3 each — 4 items = 12 bytes > 10 threshold
234+
for i in 0..4 {
235+
acc.push(i, 3).await.unwrap();
236+
}
237+
238+
let batch = drainer.next_batch().await;
239+
assert_eq!(batch.len(), 4);
240+
}
241+
242+
#[tokio::test]
243+
async fn test_drain_on_time_threshold() {
244+
let config = AccumulatorConfig {
245+
channel_capacity: 100,
246+
max_items: 1000,
247+
max_bytes: usize::MAX,
248+
max_wait: Duration::from_millis(50), // 50ms
249+
};
250+
let (acc, mut drainer) = BatchAccumulator::new(config);
251+
252+
// Push 2 items (below count/byte threshold)
253+
acc.push(1, 1).await.unwrap();
254+
acc.push(2, 1).await.unwrap();
255+
256+
// Drain should fire after 50ms timeout
257+
let batch = drainer.next_batch().await;
258+
assert_eq!(batch.len(), 2);
259+
}
260+
261+
#[tokio::test]
262+
async fn test_backpressure_when_full() {
263+
let config = AccumulatorConfig {
264+
channel_capacity: 3,
265+
max_items: 100,
266+
max_bytes: usize::MAX,
267+
max_wait: Duration::from_secs(60),
268+
};
269+
let (acc, _drainer) = BatchAccumulator::<i32>::new(config);
270+
271+
// Fill to capacity
272+
acc.push(1, 1).await.unwrap();
273+
acc.push(2, 1).await.unwrap();
274+
acc.push(3, 1).await.unwrap();
275+
276+
// Next push should fail (backpressure)
277+
let result = acc.push(4, 1).await;
278+
assert!(result.is_err());
279+
}
280+
281+
#[tokio::test]
282+
async fn test_shutdown_drains_remaining() {
283+
let config = AccumulatorConfig {
284+
channel_capacity: 100,
285+
max_items: 1000,
286+
max_bytes: usize::MAX,
287+
max_wait: Duration::from_secs(60),
288+
};
289+
let (acc, mut drainer) = BatchAccumulator::new(config);
290+
291+
acc.push(10, 1).await.unwrap();
292+
acc.push(20, 1).await.unwrap();
293+
294+
// Drop the push handle (simulates shutdown)
295+
drop(acc);
296+
297+
// next_batch should return remaining items
298+
let batch = drainer.next_batch().await;
299+
assert_eq!(batch, vec![10, 20]);
300+
301+
// Subsequent call returns empty (channel closed)
302+
let batch = drainer.next_batch().await;
303+
assert!(batch.is_empty());
304+
}
305+
306+
#[tokio::test]
307+
async fn test_multiple_batches() {
308+
let config = AccumulatorConfig {
309+
channel_capacity: 100,
310+
max_items: 3,
311+
max_bytes: usize::MAX,
312+
max_wait: Duration::from_secs(60),
313+
};
314+
let (acc, mut drainer) = BatchAccumulator::new(config);
315+
316+
// Push 7 items — should produce 2 full batches + 1 partial
317+
for i in 0..7 {
318+
acc.push(i, 1).await.unwrap();
319+
}
320+
drop(acc); // signal shutdown to drain the last partial
321+
322+
let b1 = drainer.next_batch().await;
323+
assert_eq!(b1.len(), 3);
324+
325+
let b2 = drainer.next_batch().await;
326+
assert_eq!(b2.len(), 3);
327+
328+
let b3 = drainer.next_batch().await;
329+
assert_eq!(b3.len(), 1); // remaining partial batch
330+
331+
let b4 = drainer.next_batch().await;
332+
assert!(b4.is_empty()); // channel closed
333+
}
334+
335+
#[tokio::test]
336+
async fn test_push_handle_is_clone() {
337+
let config = AccumulatorConfig::default();
338+
let (acc, mut drainer) = BatchAccumulator::new(config);
339+
340+
let acc2 = acc.clone();
341+
342+
acc.push(1, 1).await.unwrap();
343+
acc2.push(2, 1).await.unwrap();
344+
345+
drop(acc);
346+
drop(acc2);
347+
348+
let batch = drainer.next_batch().await;
349+
assert_eq!(batch.len(), 2);
350+
}
351+
352+
#[tokio::test]
353+
async fn test_drain_remaining_on_shutdown() {
354+
let config = AccumulatorConfig {
355+
channel_capacity: 100,
356+
max_items: 1000,
357+
max_bytes: usize::MAX,
358+
max_wait: Duration::from_secs(60),
359+
};
360+
let (acc, mut drainer) = BatchAccumulator::new(config);
361+
362+
acc.push(1, 1).await.unwrap();
363+
acc.push(2, 1).await.unwrap();
364+
acc.push(3, 1).await.unwrap();
365+
drop(acc);
366+
367+
let remaining = drainer.drain_remaining();
368+
assert_eq!(remaining, vec![1, 2, 3]);
369+
}
370+
371+
#[tokio::test]
372+
async fn test_empty_drain_returns_empty() {
373+
let config = AccumulatorConfig::default();
374+
let (_acc, mut drainer) = BatchAccumulator::<i32>::new(config);
375+
376+
let remaining = drainer.drain_remaining();
377+
assert!(remaining.is_empty());
378+
}
379+
}

0 commit comments

Comments
 (0)