From fc44a93ebc481792873738504628a7639371b041 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Mon, 14 Jul 2025 16:03:15 +0200 Subject: [PATCH 1/5] Rename `Succeeded` to `Connected` This is more specific, and would make more sense when we add `Ended` as an additional status. Signed-off-by: Sahas Subramanian --- src/client/microgrid_client_actor.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/client/microgrid_client_actor.rs b/src/client/microgrid_client_actor.rs index c8f0f6c..ba98dd4 100644 --- a/src/client/microgrid_client_actor.rs +++ b/src/client/microgrid_client_actor.rs @@ -26,7 +26,7 @@ use crate::{ enum StreamStatus { Failed(u64), - Succeeded(u64), + Connected(u64), } /// This actor owns the connection to the microgrid API and processes instructions @@ -82,7 +82,7 @@ impl MicrogridClientActor { RetryTracker::new ).mark_new_failure(); } - Some(StreamStatus::Succeeded(component_id)) => { + Some(StreamStatus::Connected(component_id)) => { components_to_retry.remove(&component_id); } None => { @@ -241,7 +241,7 @@ async fn start_component_data_stream( }; stream_stopped_tx - .send(StreamStatus::Succeeded(component_id)) + .send(StreamStatus::Connected(component_id)) .await .map_err(|e| { Error::connection_failure(format!( From dc271b5505f31fe074894fc4c05ed74ef39ec910 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Thu, 10 Jul 2025 14:21:38 +0200 Subject: [PATCH 2/5] Drop formulas with no subscribers Signed-off-by: Sahas Subramanian --- src/logical_meter/logical_meter_actor.rs | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/src/logical_meter/logical_meter_actor.rs b/src/logical_meter/logical_meter_actor.rs index 123b996..738c425 100644 --- a/src/logical_meter/logical_meter_actor.rs +++ b/src/logical_meter/logical_meter_actor.rs @@ -218,14 +218,23 @@ impl LogicalMeterActor { comp_data.insert(resampler.component_id, resampled[0].clone().value()); } - for (_, formula) in formulas.iter_mut() { + let mut formulas_to_drop = vec![]; + for (formula_str, formula) in formulas.iter_mut() { let result = formula.formula.calculate(&comp_data).map_err(|e| { Error::formula_engine_error(format!("Failed to evaluate formula: {e}")) })?; - formula - .sender - .send(Sample::new(self.next_ts, result)) - .map_err(|_| Error::internal("Failed to send sample for formula".to_string()))?; + + if let Err(e) = formula.sender.send(Sample::new(self.next_ts, result)) { + tracing::debug!("No remaining subscribers for formula: {formula_str}. Err: {e}"); + formulas_to_drop.push(formula_str.to_string()); + } + } + + for formula_str in &formulas_to_drop { + if let Some(formula) = formulas.remove(formula_str) { + tracing::debug!("Dropping formula: {}", formula_str); + drop(formula); + } } self.next_ts += self.config.resampling_interval; From 569b1107dd33d5f35d400f83c11bc7bf59f56181 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Mon, 14 Jul 2025 16:25:31 +0200 Subject: [PATCH 3/5] Drop unused resamplers Signed-off-by: Sahas Subramanian --- src/logical_meter/logical_meter_actor.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/logical_meter/logical_meter_actor.rs b/src/logical_meter/logical_meter_actor.rs index 738c425..c6c0023 100644 --- a/src/logical_meter/logical_meter_actor.rs +++ b/src/logical_meter/logical_meter_actor.rs @@ -8,7 +8,7 @@ use chrono::{DateTime, TimeDelta, Timelike as _, Utc}; use frequenz_microgrid_formula_engine::FormulaEngine; use frequenz_resampling::ResamplingFunction; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use tokio::sync::{broadcast, mpsc, oneshot}; use tokio::time::{MissedTickBehavior, interval}; @@ -236,6 +236,20 @@ impl LogicalMeterActor { drop(formula); } } + if !formulas_to_drop.is_empty() { + let mut components = HashSet::::new(); + for (_, formula) in formulas.iter() { + components.extend(formula.formula.components()); + } + resamplers.retain(|component_id, _| { + if components.contains(component_id) { + true + } else { + tracing::debug!("Dropping resampler for component {}", component_id); + false + } + }); + } self.next_ts += self.config.resampling_interval; Ok(()) From 48eee279996de1096c0547ec4bbc2f4ffd85c36d Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Mon, 14 Jul 2025 16:19:57 +0200 Subject: [PATCH 4/5] End ComponentData streams that have no receivers Signed-off-by: Sahas Subramanian --- src/client/microgrid_client_actor.rs | 33 +++++++++++++++++++--------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/src/client/microgrid_client_actor.rs b/src/client/microgrid_client_actor.rs index ba98dd4..c0085a8 100644 --- a/src/client/microgrid_client_actor.rs +++ b/src/client/microgrid_client_actor.rs @@ -27,6 +27,7 @@ use crate::{ enum StreamStatus { Failed(u64), Connected(u64), + Ended(u64), } /// This actor owns the connection to the microgrid API and processes instructions @@ -85,6 +86,9 @@ impl MicrogridClientActor { Some(StreamStatus::Connected(component_id)) => { components_to_retry.remove(&component_id); } + Some(StreamStatus::Ended(component_id)) => { + components_to_retry.remove(&component_id); + } None => { tracing::error!("MicrogridClientActor: Stream status channel closed, exiting."); return; @@ -263,6 +267,23 @@ async fn run_component_data_stream( stream_stopped_tx: mpsc::Sender, ) { loop { + if tx.receiver_count() == 0 { + tracing::debug!( + "Dropping ComponentData stream for component_id:{:?}", + component_id + ); + stream_stopped_tx + .send(StreamStatus::Ended(component_id)) + .await + .unwrap_or_else(|e| { + tracing::error!( + "Failed to send stream ended message for {:?}: {:?}", + component_id, + e + ); + }); + return; + } let message = match stream.message().await { Ok(m) => m, Err(e) => { @@ -289,16 +310,8 @@ async fn run_component_data_stream( } }; - match tx.send(data) { - Ok(_) => {} - Err(e) => { - tracing::error!( - "Unable to send component data for {:?}: {}. Closing stream.", - component_id, - e - ); - break; - } + if tx.send(data).is_err() { + continue; }; } From b43fede8d2399f7fd23a921e5cef09f132c7771e Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Tue, 15 Jul 2025 17:22:47 +0200 Subject: [PATCH 5/5] Install protoc in CI Also switch to a newer version of Ubuntu so that we can get a recent enough version of the protoc compiler. Signed-off-by: Sahas Subramanian --- .github/workflows/ci.yaml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 868e4a6..1b45300 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -8,7 +8,7 @@ on: jobs: test: - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 steps: - name: Fetch sources @@ -16,5 +16,8 @@ jobs: with: submodules: recursive + - name: Install protoc + run: sudo apt-get update && sudo apt-get install -y protobuf-compiler + - name: Run tests uses: frequenz-floss/gh-action-cargo-test@v1.0.0