Skip to content

Commit 88a5c28

Browse files
committed
Refactor stream polling into a function
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
1 parent dc4bf9f commit 88a5c28

1 file changed

Lines changed: 26 additions & 24 deletions

File tree

src/logical_meter/logical_meter_actor.rs

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,26 @@ struct ComponentDataResampler {
3434
receiver: broadcast::Receiver<ElectricalComponentTelemetry>,
3535
}
3636

37+
/// Polls the broadcast receiver once, logging `Lagged` as debug and
38+
/// retrying. Returns `Some(data)` with the next sample, or `None` on
39+
/// `Empty` / `Closed`. `Lagged` can happen when the server bursts enough
40+
/// samples during a wall-clock jump to fill the channel buffer.
41+
fn poll_telemetry(
42+
receiver: &mut broadcast::Receiver<ElectricalComponentTelemetry>,
43+
component_id: u64,
44+
) -> Option<ElectricalComponentTelemetry> {
45+
loop {
46+
match receiver.try_recv() {
47+
Ok(data) => return Some(data),
48+
Err(tokio::sync::broadcast::error::TryRecvError::Empty) => return None,
49+
Err(tokio::sync::broadcast::error::TryRecvError::Lagged(n)) => {
50+
tracing::debug!("resampler receiver lagged {n} samples for cid={component_id}");
51+
}
52+
Err(tokio::sync::broadcast::error::TryRecvError::Closed) => return None,
53+
}
54+
}
55+
}
56+
3757
/// Used to send strongly-typed formula streams from the LogicalMeterActor back
3858
/// to the Handle.
3959
pub(crate) enum TypedFormulaResponseSender {
@@ -479,21 +499,10 @@ impl<C: Clock> LogicalMeterActor<C> {
479499
let mut resampled_metrics: HashMap<Metric, HashMap<u64, Option<f32>>> = HashMap::new();
480500

481501
for (_, resampler) in resamplers.iter_mut() {
482-
loop {
483-
match resampler.receiver.try_recv() {
484-
Ok(data) => self.push_to_resampler(resampler, data, resampler.metric),
485-
Err(tokio::sync::broadcast::error::TryRecvError::Empty) => break,
486-
// On a wall-clock jump the server may burst enough samples
487-
// to fill the broadcast buffer faster than we drain it;
488-
// `Lagged` means we fell behind — skip and keep draining.
489-
Err(tokio::sync::broadcast::error::TryRecvError::Lagged(n)) => {
490-
tracing::debug!(
491-
"resampler receiver lagged {n} samples for cid={}",
492-
resampler.component_id
493-
);
494-
}
495-
Err(tokio::sync::broadcast::error::TryRecvError::Closed) => break,
496-
}
502+
while let Some(data) =
503+
poll_telemetry(&mut resampler.receiver, resampler.component_id)
504+
{
505+
self.push_to_resampler(resampler, data, resampler.metric);
497506
}
498507
let resampled = resampler.resampler.resample(self.resampler_ts);
499508
if resampled.len() != 1 {
@@ -525,15 +534,8 @@ impl<C: Clock> LogicalMeterActor<C> {
525534
for resampler in resamplers.values_mut() {
526535
// Drain any samples that were queued during the jump window;
527536
// they are timestamped on the old wall-clock frame and would
528-
// pollute the freshly-aligned resampler. `Lagged` is fine — it
529-
// just means the receiver fell behind; skip and keep draining.
530-
loop {
531-
match resampler.receiver.try_recv() {
532-
Ok(_) => continue,
533-
Err(tokio::sync::broadcast::error::TryRecvError::Lagged(_)) => continue,
534-
_ => break,
535-
}
536-
}
537+
// pollute the freshly-aligned resampler.
538+
while poll_telemetry(&mut resampler.receiver, resampler.component_id).is_some() {}
537539
let function = self
538540
.config
539541
.resampling_overrides

0 commit comments

Comments
 (0)