-
Notifications
You must be signed in to change notification settings - Fork 3
Stop producing values when there are no consumers #6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -26,7 +26,8 @@ use crate::{ | |||||||||||||
|
|
||||||||||||||
| enum StreamStatus { | ||||||||||||||
| Failed(u64), | ||||||||||||||
| Succeeded(u64), | ||||||||||||||
| Connected(u64), | ||||||||||||||
| Ended(u64), | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| /// This actor owns the connection to the microgrid API and processes instructions | ||||||||||||||
|
|
@@ -82,7 +83,10 @@ impl MicrogridClientActor { | |||||||||||||
| RetryTracker::new | ||||||||||||||
| ).mark_new_failure(); | ||||||||||||||
| } | ||||||||||||||
| Some(StreamStatus::Succeeded(component_id)) => { | ||||||||||||||
| Some(StreamStatus::Connected(component_id)) => { | ||||||||||||||
| components_to_retry.remove(&component_id); | ||||||||||||||
| } | ||||||||||||||
| Some(StreamStatus::Ended(component_id)) => { | ||||||||||||||
| components_to_retry.remove(&component_id); | ||||||||||||||
| } | ||||||||||||||
| None => { | ||||||||||||||
|
|
@@ -241,7 +245,7 @@ async fn start_component_data_stream( | |||||||||||||
| }; | ||||||||||||||
|
|
||||||||||||||
| stream_stopped_tx | ||||||||||||||
| .send(StreamStatus::Succeeded(component_id)) | ||||||||||||||
| .send(StreamStatus::Connected(component_id)) | ||||||||||||||
| .await | ||||||||||||||
| .map_err(|e| { | ||||||||||||||
| Error::connection_failure(format!( | ||||||||||||||
|
|
@@ -263,6 +267,23 @@ async fn run_component_data_stream( | |||||||||||||
| stream_stopped_tx: mpsc::Sender<StreamStatus>, | ||||||||||||||
| ) { | ||||||||||||||
| loop { | ||||||||||||||
| if tx.receiver_count() == 0 { | ||||||||||||||
| tracing::debug!( | ||||||||||||||
| "Dropping ComponentData stream for component_id:{:?}", | ||||||||||||||
| component_id | ||||||||||||||
| ); | ||||||||||||||
| stream_stopped_tx | ||||||||||||||
| .send(StreamStatus::Ended(component_id)) | ||||||||||||||
| .await | ||||||||||||||
| .unwrap_or_else(|e| { | ||||||||||||||
| tracing::error!( | ||||||||||||||
| "Failed to send stream ended message for {:?}: {:?}", | ||||||||||||||
| component_id, | ||||||||||||||
| e | ||||||||||||||
| ); | ||||||||||||||
| }); | ||||||||||||||
| return; | ||||||||||||||
| } | ||||||||||||||
| let message = match stream.message().await { | ||||||||||||||
| Ok(m) => m, | ||||||||||||||
| Err(e) => { | ||||||||||||||
|
|
@@ -289,16 +310,8 @@ async fn run_component_data_stream( | |||||||||||||
| } | ||||||||||||||
| }; | ||||||||||||||
|
|
||||||||||||||
| match tx.send(data) { | ||||||||||||||
| Ok(_) => {} | ||||||||||||||
| Err(e) => { | ||||||||||||||
| tracing::error!( | ||||||||||||||
| "Unable to send component data for {:?}: {}. Closing stream.", | ||||||||||||||
| component_id, | ||||||||||||||
| e | ||||||||||||||
| ); | ||||||||||||||
| break; | ||||||||||||||
| } | ||||||||||||||
| if tx.send(data).is_err() { | ||||||||||||||
|
||||||||||||||
| if tx.send(data).is_err() { | |
| if tx.send(data).is_err() { | |
| tracing::debug!( | |
| "Failed to send data for component_id: {:?}. Skipping to next iteration.", | |
| component_id | |
| ); |
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -8,7 +8,7 @@ | |||||||||||
| use chrono::{DateTime, TimeDelta, Timelike as _, Utc}; | ||||||||||||
| use frequenz_microgrid_formula_engine::FormulaEngine; | ||||||||||||
| use frequenz_resampling::ResamplingFunction; | ||||||||||||
| use std::collections::HashMap; | ||||||||||||
| use std::collections::{HashMap, HashSet}; | ||||||||||||
| use tokio::sync::{broadcast, mpsc, oneshot}; | ||||||||||||
| use tokio::time::{MissedTickBehavior, interval}; | ||||||||||||
|
|
||||||||||||
|
|
@@ -218,14 +218,37 @@ impl LogicalMeterActor { | |||||||||||
| comp_data.insert(resampler.component_id, resampled[0].clone().value()); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| for (_, formula) in formulas.iter_mut() { | ||||||||||||
| let mut formulas_to_drop = vec![]; | ||||||||||||
| for (formula_str, formula) in formulas.iter_mut() { | ||||||||||||
| let result = formula.formula.calculate(&comp_data).map_err(|e| { | ||||||||||||
| Error::formula_engine_error(format!("Failed to evaluate formula: {e}")) | ||||||||||||
| })?; | ||||||||||||
| formula | ||||||||||||
| .sender | ||||||||||||
| .send(Sample::new(self.next_ts, result)) | ||||||||||||
| .map_err(|_| Error::internal("Failed to send sample for formula".to_string()))?; | ||||||||||||
|
|
||||||||||||
| if let Err(e) = formula.sender.send(Sample::new(self.next_ts, result)) { | ||||||||||||
| tracing::debug!("No remaining subscribers for formula: {formula_str}. Err: {e}"); | ||||||||||||
| formulas_to_drop.push(formula_str.to_string()); | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| for formula_str in &formulas_to_drop { | ||||||||||||
| if let Some(formula) = formulas.remove(formula_str) { | ||||||||||||
| tracing::debug!("Dropping formula: {}", formula_str); | ||||||||||||
| drop(formula); | ||||||||||||
|
niklas-timpe marked this conversation as resolved.
|
||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
Comment on lines
+221
to
+238
|
||||||||||||
| if !formulas_to_drop.is_empty() { | ||||||||||||
| let mut components = HashSet::<u64>::new(); | ||||||||||||
| for (_, formula) in formulas.iter() { | ||||||||||||
| components.extend(formula.formula.components()); | ||||||||||||
| } | ||||||||||||
|
Comment on lines
+240
to
+243
|
||||||||||||
| let mut components = HashSet::<u64>::new(); | |
| for (_, formula) in formulas.iter() { | |
| components.extend(formula.formula.components()); | |
| } | |
| let components = collect_active_components(formulas); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Since
ConnectedandEndedvariants are currently handled the same way, consider either consolidating them or adding comments that explain the semantic difference to future readers.