Fix telemetry and bounds tracker task leaks when consumers drop#50
Fix telemetry and bounds tracker task leaks when consumers drop#50shsms wants to merge 7 commits into
Conversation
The run loop logged "Failed to send component status" on every sample and every missing-data tick once the pool tracker dropped its mpsc receiver, but never exited, leaking the task and its broadcast subscription for the life of the process (scaling with each pool recreation). Break the loop on send failure, matching the RecvError::Closed arm, so the tracker shuts down when there is nothing left to report to. Fixes frequenz-floss#43. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
`RecvError::Closed` means the upstream telemetry tracker dropped its sender, which happens on a normal teardown of the pool — the bounds tracker has nothing left to aggregate. Log it at debug rather than error, matching the no-receivers path just above. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
The tick arm skips `send()` whenever the partitioning is unchanged, but a failed `send()` is the only thing that tells the tracker every receiver has gone. With a stable partition (e.g. all components silent, each re-emitting the same unhealthy status) the snapshot never changes, so the skip path is taken every tick and the dropped receivers are never noticed — the tracker and its component trackers leak until process exit. Check `receiver_count()` each tick, before the unchanged-skip, and break when it reaches zero. Applies to both the PV and battery pool telemetry trackers. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Both pool telemetry trackers now treat every exit from `run` as a normal shutdown: the loop ends when every component tracker has gone or every receiver has dropped, neither of which is an error. The trailing `error!` + `Err` return became a `debug!`, and the in-loop send-failure no longer logs at error. With shutdown no longer fallible, `run` has nothing left to report, so drop its `Result` return (the call sites only `tokio::spawn` it and discard the result). The remaining startup failures -- an empty component set and opening a component's telemetry stream -- now log at their source before the task exits, instead of being propagated into a discarded `JoinHandle`. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
The group tracker logged at error and returned `Err` on three paths that are all ordinary shutdown: its inverter or battery component trackers all exiting (`recv` yields `None`), and the pool tracker dropping its receiver (the send to it fails). The last one fired on every `BatteryPool` teardown, so each group tracker spammed "Failed to send inverter-battery group status" at error level. Log these at debug and just return, matching how the pool telemetry trackers already treat their own shutdown. With shutdown no longer an error, `run` has nothing left to fail with, so drop its `Result` return; the remaining startup failures -- opening a component's telemetry stream -- now log at their source before the task exits, instead of being propagated into a discarded `JoinHandle`. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
The per-component stream task exits once it sees no receivers and reports `StreamStatus::Ended`, but the actor only removed the component from the retry map — the now-dead `broadcast::Sender` stayed cached in `component_streams`. Any later subscription for that component was handed `tx.subscribe()` on that dead sender without a new tonic stream being started, so it never received telemetry. With the telemetry trackers now exiting cleanly instead of leaking (which used to hold receivers forever and mask this), every drop-pool/rebuild cycle hit it: the rebuilt pool saw its components as permanently silent. On `Ended`, drop the cached sender so the next subscription starts a fresh stream. If a subscriber arrived in the window between the stream task's no-receivers check and the actor processing `Ended`, the entry still has receivers — restart the stream for them instead of evicting it, since the old task is already gone either way. Fixes frequenz-floss#42. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
5f2e4db to
606e4e1
Compare
llucax
left a comment
There was a problem hiding this comment.
🤖 rAIview (AI review here!)
I found three issues worth addressing before merge: two lifecycle edge cases where tasks/cache entries can still survive after consumers drop or resubscriptions can stall, and one public API compatibility break from changing exported tracker run() methods to return ().
| } | ||
| Some(StreamStatus::Ended(component_id)) => { | ||
| components_to_retry.remove(&component_id); | ||
| match component_streams.get(&component_id) { |
There was a problem hiding this comment.
🟡 Idle component streams never send Ended
The new eviction path only runs after the actor receives StreamStatus::Ended, but the stream task sends Ended only from the top of its loop, before it awaits the next tonic item. If the last receiver drops while the tonic stream is idle but still open, the task is already parked in stream.next().await; it never rechecks tx.receiver_count(), never sends Ended, and component_streams keeps the stale sender. A later subscription then takes tx.subscribe() on that stale sender instead of creating a fresh API stream.
The new test exercises a stream that still has another sample after the receiver is dropped, so the task wakes up and reaches the receiver-count check. It does not cover the idle-open stream case.
Impact
A pool recreated after its component streams have gone silent can still get a cached stream that never yields fresh telemetry. The old per-component stream task and API stream can also remain alive indefinitely, so the task/cache leak is not fully fixed.
Related locations
- :
frequenz-microgrid-rs/src/client/microgrid_client_actor.rs
Lines 355 to 373 in 606e4e1
Endedis only sent before awaiting the next stream item. - : the new test relies on there being another sample/tick after the receiver is dropped.
frequenz-microgrid-rs/src/client/microgrid_client_handle.rs
Lines 420 to 423 in 606e4e1
Suggested fix
Make the stream task wake independently of incoming telemetry when there are no receivers. For example, wrap stream.next() in a tokio::select! with a small interval that checks tx.receiver_count() == 0 and sends StreamStatus::Ended, or introduce an explicit cancellation path tied to receiver lifetime.
Suggested tests
Add a resubscribe test where the mock stream stays open but idle after its last sample, drop the receiver after the stream task is parked on stream.next(), and verify that a later subscription starts a fresh API stream and receives telemetry.
Suggested new tests
#[tokio::test(start_paused = true)]
async fn test_resubscribing_after_idle_stream_restarts_stream() {
let api_client = MockMicrogridApiClient::new(
MockComponent::grid(1).with_children(vec![
MockComponent::meter(2)
.with_power(vec![4.0])
.with_silence_after_metrics(),
]),
);
let handle = MicrogridClientHandle::new_from_client(api_client);
let mut telemetry_rx = handle
.receive_electrical_component_telemetry_stream(2)
.await
.unwrap();
telemetry_rx.recv().await.unwrap();
tokio::task::yield_now().await;
drop(telemetry_rx);
tokio::time::advance(std::time::Duration::from_secs(2)).await;
let mut telemetry_rx = handle
.receive_electrical_component_telemetry_stream(2)
.await
.unwrap();
tokio::time::timeout(std::time::Duration::from_secs(1), telemetry_rx.recv())
.await
.expect("no telemetry after resubscribing to an idle stream")
.unwrap();
}🟡 Medium Severity: probably best to fix before merging, because this leaves one of the PR's main cache/leak scenarios unresolved.
| } | ||
| Err(broadcast::error::RecvError::Closed) => { | ||
| tracing::error!( | ||
| // The telemetry tracker upstream has shut down — a normal |
There was a problem hiding this comment.
🟡 Bounds trackers can keep snapshot trackers alive after consumers drop
The bounds tracker only checks pool_bounds_tx.receiver_count() after pool_status_rx.recv().await yields a new snapshot. If the caller drops the last bounds receiver after the last status change, and the pool status then stays unchanged, this task remains blocked in recv(). Its pool_status_rx still counts as a receiver of the pool snapshot stream, so the pool telemetry tracker sees receiver_count() > 0 and keeps running instead of shutting down.
This also affects resubscription: power_bounds() will reuse the leaked snapshot sender because that old bounds tracker is still subscribed, then the new bounds tracker subscribes to a broadcast channel that only delivers future snapshots. If the status remains stable and the pool tracker keeps taking the unchanged-skip path, the new bounds receiver may not get any value until some unrelated telemetry/status change occurs.
Impact
Dropping all power_bounds() consumers can still leak the bounds tracker, the pool telemetry tracker, and the component subscriptions. A later power_bounds() call can also hang without an initial bounds update in the stable-status case.
Related locations
- : the battery bounds tracker has the same blocking
frequenz-microgrid-rs/src/microgrid/battery_bounds_tracker.rs
Lines 61 to 90 in 606e4e1
recv()shape. - : unchanged PV snapshots are skipped, so a leaked bounds tracker may never be woken to notice its own receivers are gone.
- : the battery pool has the same unchanged-skip path.
Suggested fix
Give both bounds trackers their own shutdown check that does not depend on receiving another pool snapshot. A minimal fix is to use tokio::select! with a periodic tick and break when pool_bounds_tx.receiver_count() == 0; dropping pool_status_rx will then let the pool tracker see that it has no remaining consumers and shut down or be recreated cleanly.
Suggested tests
Add PV and battery tests that consume a bounds update, let the components reach a stable unchanged state, drop the bounds receiver, advance simulated time, and then call power_bounds() again. The new receiver should get a fresh update instead of timing out, and the old tracker should not keep the snapshot sender alive.
🟡 Medium Severity: probably best to fix before merging, because this preserves one of the task leak/stalled-resubscription paths the PR is meant to eliminate.
| } | ||
|
|
||
| pub async fn run(self) -> Result<(), Error> { | ||
| pub async fn run(self) { |
There was a problem hiding this comment.
🟡 Public tracker run() no longer returns Result
PvPoolTelemetryTracker and BatteryPoolTelemetryTracker are public and re-exported from the crate root, and this changes their public run() signature from pub async fn run(self) -> Result<(), Error> to pub async fn run(self). Downstream code that awaits the tracker and propagates or matches the result will stop compiling. The release notes currently list only bug fixes and do not call this out as a breaking API change.
Impact
This is a source-incompatible public API change. It also removes the only programmatic way for external callers to observe startup failures from run(), even though normal shutdown can still be treated as success.
Related locations
- :
BatteryPoolTelemetryTracker::run()has the same signature change. - : both tracker types are re-exported from the crate root.
frequenz-microgrid-rs/src/lib.rs
Lines 39 to 42 in 606e4e1
- : the release notes do not mention a breaking change.
frequenz-microgrid-rs/RELEASE_NOTES.md
Lines 14 to 18 in 606e4e1
Suggested fix
If the signature change is not intended, keep Result<(), Error> and return Ok(()) for normal shutdown while preserving Err(...) for startup/configuration failures. If it is intended, add a breaking-change release-note entry and consider applying the existing scope:breaking-change label.
Suggested tests
Add a public API/semver check or compile test that catches return-type changes on exported tracker methods before release.
🟡 Medium Severity: probably best to fix or explicitly document before merging, because this is a public source-compatibility break.
Telemetry and bounds trackers leaked their tasks — logging at error level on every tick — once all their consumers had dropped, and a pool recreated on the same client silently received no telemetry because the client actor kept a dead stream sender cached. This branch shuts the trackers down cleanly when their consumers are gone and evicts ended streams from the client cache.
Changes
receiver_count()each tick, covering the unchanged-partition skip path where a failedsend()would otherwise never fire to signal that the last receiver is gone.run()loops no longer returnResult(shutdown is not an error); the remaining startup failures now log at their source before the task exits instead of being propagated into a discardedJoinHandle.