diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 868e4a6..1b45300 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -8,7 +8,7 @@ on: jobs: test: - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 steps: - name: Fetch sources @@ -16,5 +16,8 @@ jobs: with: submodules: recursive + - name: Install protoc + run: sudo apt-get update && sudo apt-get install -y protobuf-compiler + - name: Run tests uses: frequenz-floss/gh-action-cargo-test@v1.0.0 diff --git a/src/client/microgrid_client_actor.rs b/src/client/microgrid_client_actor.rs index c8f0f6c..c0085a8 100644 --- a/src/client/microgrid_client_actor.rs +++ b/src/client/microgrid_client_actor.rs @@ -26,7 +26,8 @@ use crate::{ enum StreamStatus { Failed(u64), - Succeeded(u64), + Connected(u64), + Ended(u64), } /// This actor owns the connection to the microgrid API and processes instructions @@ -82,7 +83,10 @@ impl MicrogridClientActor { RetryTracker::new ).mark_new_failure(); } - Some(StreamStatus::Succeeded(component_id)) => { + Some(StreamStatus::Connected(component_id)) => { + components_to_retry.remove(&component_id); + } + Some(StreamStatus::Ended(component_id)) => { components_to_retry.remove(&component_id); } None => { @@ -241,7 +245,7 @@ async fn start_component_data_stream( }; stream_stopped_tx - .send(StreamStatus::Succeeded(component_id)) + .send(StreamStatus::Connected(component_id)) .await .map_err(|e| { Error::connection_failure(format!( @@ -263,6 +267,23 @@ async fn run_component_data_stream( stream_stopped_tx: mpsc::Sender, ) { loop { + if tx.receiver_count() == 0 { + tracing::debug!( + "Dropping ComponentData stream for component_id:{:?}", + component_id + ); + stream_stopped_tx + .send(StreamStatus::Ended(component_id)) + .await + .unwrap_or_else(|e| { + tracing::error!( + "Failed to send stream ended message for {:?}: {:?}", + component_id, + e + ); + }); + return; + } let message = match stream.message().await { Ok(m) => m, Err(e) => { @@ -289,16 +310,8 @@ async fn run_component_data_stream( } }; - match tx.send(data) { - Ok(_) => {} - Err(e) => { - tracing::error!( - "Unable to send component data for {:?}: {}. Closing stream.", - component_id, - e - ); - break; - } + if tx.send(data).is_err() { + continue; }; } diff --git a/src/logical_meter/logical_meter_actor.rs b/src/logical_meter/logical_meter_actor.rs index 123b996..c6c0023 100644 --- a/src/logical_meter/logical_meter_actor.rs +++ b/src/logical_meter/logical_meter_actor.rs @@ -8,7 +8,7 @@ use chrono::{DateTime, TimeDelta, Timelike as _, Utc}; use frequenz_microgrid_formula_engine::FormulaEngine; use frequenz_resampling::ResamplingFunction; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use tokio::sync::{broadcast, mpsc, oneshot}; use tokio::time::{MissedTickBehavior, interval}; @@ -218,14 +218,37 @@ impl LogicalMeterActor { comp_data.insert(resampler.component_id, resampled[0].clone().value()); } - for (_, formula) in formulas.iter_mut() { + let mut formulas_to_drop = vec![]; + for (formula_str, formula) in formulas.iter_mut() { let result = formula.formula.calculate(&comp_data).map_err(|e| { Error::formula_engine_error(format!("Failed to evaluate formula: {e}")) })?; - formula - .sender - .send(Sample::new(self.next_ts, result)) - .map_err(|_| Error::internal("Failed to send sample for formula".to_string()))?; + + if let Err(e) = formula.sender.send(Sample::new(self.next_ts, result)) { + tracing::debug!("No remaining subscribers for formula: {formula_str}. Err: {e}"); + formulas_to_drop.push(formula_str.to_string()); + } + } + + for formula_str in &formulas_to_drop { + if let Some(formula) = formulas.remove(formula_str) { + tracing::debug!("Dropping formula: {}", formula_str); + drop(formula); + } + } + if !formulas_to_drop.is_empty() { + let mut components = HashSet::::new(); + for (_, formula) in formulas.iter() { + components.extend(formula.formula.components()); + } + resamplers.retain(|component_id, _| { + if components.contains(component_id) { + true + } else { + tracing::debug!("Dropping resampler for component {}", component_id); + false + } + }); } self.next_ts += self.config.resampling_interval;