Stop producing values when there are no consumers#6
Conversation
This is more specific, and would make more sense when we add `Ended` as an additional status. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
There was a problem hiding this comment.
Pull Request Overview
This PR stops producing and forwarding samples when there are no active consumers by dropping unused formulas and resamplers, and refines stream status handling in the microgrid client.
- In
logical_meter_actor, track subscriber removal to drop formulas and their dependent resamplers. - In
microgrid_client_actor, renameSucceededtoConnected, introduceEnded, and adjust send logic to skip when no receivers. - Simplify streaming loops to rely on subscriber counts rather than error-driven breaks.
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| src/logical_meter/logical_meter_actor.rs | Drop unused formulas and associated resamplers |
| src/client/microgrid_client_actor.rs | Update StreamStatus, send logic, and end-of-stream handling |
Comments suppressed due to low confidence (2)
src/logical_meter/logical_meter_actor.rs:221
- [nitpick] Consider renaming
formulas_to_droptoformulas_to_removeorkeys_to_removefor clearer intent.
let mut formulas_to_drop = vec![];
src/client/microgrid_client_actor.rs:270
- Add unit or integration tests to verify that the stream stops and the
Endedstatus is sent when all subscribers drop their receivers.
if tx.receiver_count() == 0 {
| let mut formulas_to_drop = vec![]; | ||
| for (formula_str, formula) in formulas.iter_mut() { | ||
| let result = formula.formula.calculate(&comp_data).map_err(|e| { | ||
| Error::formula_engine_error(format!("Failed to evaluate formula: {e}")) | ||
| })?; | ||
| formula | ||
| .sender | ||
| .send(Sample::new(self.next_ts, result)) | ||
| .map_err(|_| Error::internal("Failed to send sample for formula".to_string()))?; | ||
|
|
||
| if let Err(e) = formula.sender.send(Sample::new(self.next_ts, result)) { | ||
| tracing::debug!("No remaining subscribers for formula: {formula_str}. Err: {e}"); | ||
| formulas_to_drop.push(formula_str.to_string()); | ||
| } | ||
| } | ||
|
|
||
| for formula_str in &formulas_to_drop { | ||
| if let Some(formula) = formulas.remove(formula_str) { | ||
| tracing::debug!("Dropping formula: {}", formula_str); | ||
| drop(formula); | ||
| } | ||
| } |
There was a problem hiding this comment.
[nitpick] You can simplify removal by using formulas.retain(|k, _| !formulas_to_drop.contains(k)) instead of collecting keys and looping twice, which reduces allocations and improves readability.
| let mut components = HashSet::<u64>::new(); | ||
| for (_, formula) in formulas.iter() { | ||
| components.extend(formula.formula.components()); | ||
| } |
There was a problem hiding this comment.
[nitpick] Consider extracting the component-collection logic into a helper function (e.g., collect_active_components(&formulas)) to improve clarity and make it easier to test in isolation.
| let mut components = HashSet::<u64>::new(); | |
| for (_, formula) in formulas.iter() { | |
| components.extend(formula.formula.components()); | |
| } | |
| let components = collect_active_components(formulas); |
| Connected(u64), | ||
| Ended(u64), |
There was a problem hiding this comment.
[nitpick] Since Connected and Ended variants are currently handled the same way, consider either consolidating them or adding comments that explain the semantic difference to future readers.
| Connected(u64), | |
| Ended(u64), | |
| Inactive(u64), |
| ); | ||
| break; | ||
| } | ||
| if tx.send(data).is_err() { |
There was a problem hiding this comment.
Silently continuing on send errors may hide unexpected conditions; consider logging a debug message when tx.send fails to help diagnose issues in production.
| if tx.send(data).is_err() { | |
| if tx.send(data).is_err() { | |
| tracing::debug!( | |
| "Failed to send data for component_id: {:?}. Skipping to next iteration.", | |
| component_id | |
| ); |
Also switch to a newer version of Ubuntu so that we can get a recent enough version of the protoc compiler. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Closes #2