Skip to content

Commit 72c78a0

Browse files
authored
Stop producing values when there are no consumers (#6)
Closes #2
2 parents 805a3cb + b43fede commit 72c78a0

3 files changed

Lines changed: 59 additions & 20 deletions

File tree

.github/workflows/ci.yaml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,16 @@ on:
88

99
jobs:
1010
test:
11-
runs-on: ubuntu-22.04
11+
runs-on: ubuntu-24.04
1212

1313
steps:
1414
- name: Fetch sources
1515
uses: actions/checkout@v4
1616
with:
1717
submodules: recursive
1818

19+
- name: Install protoc
20+
run: sudo apt-get update && sudo apt-get install -y protobuf-compiler
21+
1922
- name: Run tests
2023
uses: frequenz-floss/gh-action-cargo-test@v1.0.0

src/client/microgrid_client_actor.rs

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ use crate::{
2626

2727
enum StreamStatus {
2828
Failed(u64),
29-
Succeeded(u64),
29+
Connected(u64),
30+
Ended(u64),
3031
}
3132

3233
/// This actor owns the connection to the microgrid API and processes instructions
@@ -82,7 +83,10 @@ impl MicrogridClientActor {
8283
RetryTracker::new
8384
).mark_new_failure();
8485
}
85-
Some(StreamStatus::Succeeded(component_id)) => {
86+
Some(StreamStatus::Connected(component_id)) => {
87+
components_to_retry.remove(&component_id);
88+
}
89+
Some(StreamStatus::Ended(component_id)) => {
8690
components_to_retry.remove(&component_id);
8791
}
8892
None => {
@@ -241,7 +245,7 @@ async fn start_component_data_stream(
241245
};
242246

243247
stream_stopped_tx
244-
.send(StreamStatus::Succeeded(component_id))
248+
.send(StreamStatus::Connected(component_id))
245249
.await
246250
.map_err(|e| {
247251
Error::connection_failure(format!(
@@ -263,6 +267,23 @@ async fn run_component_data_stream(
263267
stream_stopped_tx: mpsc::Sender<StreamStatus>,
264268
) {
265269
loop {
270+
if tx.receiver_count() == 0 {
271+
tracing::debug!(
272+
"Dropping ComponentData stream for component_id:{:?}",
273+
component_id
274+
);
275+
stream_stopped_tx
276+
.send(StreamStatus::Ended(component_id))
277+
.await
278+
.unwrap_or_else(|e| {
279+
tracing::error!(
280+
"Failed to send stream ended message for {:?}: {:?}",
281+
component_id,
282+
e
283+
);
284+
});
285+
return;
286+
}
266287
let message = match stream.message().await {
267288
Ok(m) => m,
268289
Err(e) => {
@@ -289,16 +310,8 @@ async fn run_component_data_stream(
289310
}
290311
};
291312

292-
match tx.send(data) {
293-
Ok(_) => {}
294-
Err(e) => {
295-
tracing::error!(
296-
"Unable to send component data for {:?}: {}. Closing stream.",
297-
component_id,
298-
e
299-
);
300-
break;
301-
}
313+
if tx.send(data).is_err() {
314+
continue;
302315
};
303316
}
304317

src/logical_meter/logical_meter_actor.rs

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
use chrono::{DateTime, TimeDelta, Timelike as _, Utc};
99
use frequenz_microgrid_formula_engine::FormulaEngine;
1010
use frequenz_resampling::ResamplingFunction;
11-
use std::collections::HashMap;
11+
use std::collections::{HashMap, HashSet};
1212
use tokio::sync::{broadcast, mpsc, oneshot};
1313
use tokio::time::{MissedTickBehavior, interval};
1414

@@ -218,14 +218,37 @@ impl LogicalMeterActor {
218218
comp_data.insert(resampler.component_id, resampled[0].clone().value());
219219
}
220220

221-
for (_, formula) in formulas.iter_mut() {
221+
let mut formulas_to_drop = vec![];
222+
for (formula_str, formula) in formulas.iter_mut() {
222223
let result = formula.formula.calculate(&comp_data).map_err(|e| {
223224
Error::formula_engine_error(format!("Failed to evaluate formula: {e}"))
224225
})?;
225-
formula
226-
.sender
227-
.send(Sample::new(self.next_ts, result))
228-
.map_err(|_| Error::internal("Failed to send sample for formula".to_string()))?;
226+
227+
if let Err(e) = formula.sender.send(Sample::new(self.next_ts, result)) {
228+
tracing::debug!("No remaining subscribers for formula: {formula_str}. Err: {e}");
229+
formulas_to_drop.push(formula_str.to_string());
230+
}
231+
}
232+
233+
for formula_str in &formulas_to_drop {
234+
if let Some(formula) = formulas.remove(formula_str) {
235+
tracing::debug!("Dropping formula: {}", formula_str);
236+
drop(formula);
237+
}
238+
}
239+
if !formulas_to_drop.is_empty() {
240+
let mut components = HashSet::<u64>::new();
241+
for (_, formula) in formulas.iter() {
242+
components.extend(formula.formula.components());
243+
}
244+
resamplers.retain(|component_id, _| {
245+
if components.contains(component_id) {
246+
true
247+
} else {
248+
tracing::debug!("Dropping resampler for component {}", component_id);
249+
false
250+
}
251+
});
229252
}
230253

231254
self.next_ts += self.config.resampling_interval;

0 commit comments

Comments
 (0)