Skip to content

Fix telemetry and bounds tracker task leaks when consumers drop#50

Open
shsms wants to merge 7 commits into
frequenz-floss:v0.x.xfrom
shsms:pv-pool-fixes
Open

Fix telemetry and bounds tracker task leaks when consumers drop#50
shsms wants to merge 7 commits into
frequenz-floss:v0.x.xfrom
shsms:pv-pool-fixes

Conversation

@shsms

@shsms shsms commented Jun 16, 2026

Copy link
Copy Markdown
Collaborator

Telemetry and bounds trackers leaked their tasks — logging at error level on every tick — once all their consumers had dropped, and a pool recreated on the same client silently received no telemetry because the client actor kept a dead stream sender cached. This branch shuts the trackers down cleanly when their consumers are gone and evicts ended streams from the client cache.

Changes

  • Pool, group, and component telemetry trackers and the PV/battery bounds trackers now shut down (logged at debug) when all their receivers/consumers have dropped, instead of leaking the task and logging an error every tick.
  • Pool telemetry trackers check receiver_count() each tick, covering the unchanged-partition skip path where a failed send() would otherwise never fire to signal that the last receiver is gone.
  • The client actor evicts an ended per-component telemetry stream from its cache, so a pool recreated on the same client starts a fresh stream and receives telemetry again — restarting the stream in place if a subscriber raced in before the eviction.
  • The tracker run() loops no longer return Result (shutdown is not an error); the remaining startup failures now log at their source before the task exits instead of being propagated into a discarded JoinHandle.

shsms added 7 commits June 11, 2026 12:33
The run loop logged "Failed to send component status" on every sample and
every missing-data tick once the pool tracker dropped its mpsc receiver, but
never exited, leaking the task and its broadcast subscription for the life of
the process (scaling with each pool recreation). Break the loop on send
failure, matching the RecvError::Closed arm, so the tracker shuts down when
there is nothing left to report to.

Fixes frequenz-floss#43.

Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
`RecvError::Closed` means the upstream telemetry tracker dropped its sender,
which happens on a normal teardown of the pool — the bounds tracker has nothing
left to aggregate. Log it at debug rather than error, matching the
no-receivers path just above.

Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
The tick arm skips `send()` whenever the partitioning is unchanged, but a failed
`send()` is the only thing that tells the tracker every receiver has gone. With
a stable partition (e.g. all components silent, each re-emitting the same
unhealthy status) the snapshot never changes, so the skip path is taken every
tick and the dropped receivers are never noticed — the tracker and its
component trackers leak until process exit.

Check `receiver_count()` each tick, before the unchanged-skip, and break when it
reaches zero. Applies to both the PV and battery pool telemetry trackers.

Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Both pool telemetry trackers now treat every exit from `run` as a normal
shutdown: the loop ends when every component tracker has gone or every receiver
has dropped, neither of which is an error. The trailing `error!` + `Err` return
became a `debug!`, and the in-loop send-failure no longer logs at error.

With shutdown no longer fallible, `run` has nothing left to report, so drop its
`Result` return (the call sites only `tokio::spawn` it and discard the result).
The remaining startup failures -- an empty component set and opening a
component's telemetry stream -- now log at their source before the task exits,
instead of being propagated into a discarded `JoinHandle`.

Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
The group tracker logged at error and returned `Err` on three paths that are
all ordinary shutdown: its inverter or battery component trackers all exiting
(`recv` yields `None`), and the pool tracker dropping its receiver (the send to
it fails). The last one fired on every `BatteryPool` teardown, so each group
tracker spammed "Failed to send inverter-battery group status" at error level.

Log these at debug and just return, matching how the pool telemetry trackers
already treat their own shutdown. With shutdown no longer an error, `run` has
nothing left to fail with, so drop its `Result` return; the remaining startup
failures -- opening a component's telemetry stream -- now log at their source
before the task exits, instead of being propagated into a discarded
`JoinHandle`.

Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
The per-component stream task exits once it sees no receivers and reports
`StreamStatus::Ended`, but the actor only removed the component from the
retry map — the now-dead `broadcast::Sender` stayed cached in
`component_streams`. Any later subscription for that component was handed
`tx.subscribe()` on that dead sender without a new tonic stream being
started, so it never received telemetry. With the telemetry trackers now
exiting cleanly instead of leaking (which used to hold receivers forever
and mask this), every drop-pool/rebuild cycle hit it: the rebuilt pool saw
its components as permanently silent.

On `Ended`, drop the cached sender so the next subscription starts a fresh
stream. If a subscriber arrived in the window between the stream task's
no-receivers check and the actor processing `Ended`, the entry still has
receivers — restart the stream for them instead of evicting it, since the
old task is already gone either way.

Fixes frequenz-floss#42.

Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>

@llucax llucax left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 rAIview (AI review here!)

I found three issues worth addressing before merge: two lifecycle edge cases where tasks/cache entries can still survive after consumers drop or resubscriptions can stall, and one public API compatibility break from changing exported tracker run() methods to return ().

}
Some(StreamStatus::Ended(component_id)) => {
components_to_retry.remove(&component_id);
match component_streams.get(&component_id) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Idle component streams never send Ended

The new eviction path only runs after the actor receives StreamStatus::Ended, but the stream task sends Ended only from the top of its loop, before it awaits the next tonic item. If the last receiver drops while the tonic stream is idle but still open, the task is already parked in stream.next().await; it never rechecks tx.receiver_count(), never sends Ended, and component_streams keeps the stale sender. A later subscription then takes tx.subscribe() on that stale sender instead of creating a fresh API stream.

The new test exercises a stream that still has another sample after the receiver is dropped, so the task wakes up and reaches the receiver-count check. It does not cover the idle-open stream case.

Impact

A pool recreated after its component streams have gone silent can still get a cached stream that never yields fresh telemetry. The old per-component stream task and API stream can also remain alive indefinitely, so the task/cache leak is not fully fixed.

Related locations

  • loop {
    if tx.receiver_count() == 0 {
    tracing::debug!(
    "Dropping ComponentData stream for component_id:{:?}",
    electrical_component_id
    );
    stream_status_tx
    .send(StreamStatus::Ended(electrical_component_id))
    .await
    .unwrap_or_else(|e| {
    tracing::error!(
    "Failed to send stream ended message for {:?}: {:?}",
    electrical_component_id,
    e
    );
    });
    return;
    }
    let message = match stream.next().await {
    : Ended is only sent before awaiting the next stream item.
  • // Let the stream task notice that all receivers are gone (it checks
    // before each sample) and the actor process its Ended status, which
    // must evict the cached sender.
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    : the new test relies on there being another sample/tick after the receiver is dropped.

Suggested fix

Make the stream task wake independently of incoming telemetry when there are no receivers. For example, wrap stream.next() in a tokio::select! with a small interval that checks tx.receiver_count() == 0 and sends StreamStatus::Ended, or introduce an explicit cancellation path tied to receiver lifetime.

Suggested tests

Add a resubscribe test where the mock stream stays open but idle after its last sample, drop the receiver after the stream task is parked on stream.next(), and verify that a later subscription starts a fresh API stream and receives telemetry.

Suggested new tests
#[tokio::test(start_paused = true)]
async fn test_resubscribing_after_idle_stream_restarts_stream() {
    let api_client = MockMicrogridApiClient::new(
        MockComponent::grid(1).with_children(vec![
            MockComponent::meter(2)
                .with_power(vec![4.0])
                .with_silence_after_metrics(),
        ]),
    );
    let handle = MicrogridClientHandle::new_from_client(api_client);

    let mut telemetry_rx = handle
        .receive_electrical_component_telemetry_stream(2)
        .await
        .unwrap();
    telemetry_rx.recv().await.unwrap();

    tokio::task::yield_now().await;
    drop(telemetry_rx);
    tokio::time::advance(std::time::Duration::from_secs(2)).await;

    let mut telemetry_rx = handle
        .receive_electrical_component_telemetry_stream(2)
        .await
        .unwrap();
    tokio::time::timeout(std::time::Duration::from_secs(1), telemetry_rx.recv())
        .await
        .expect("no telemetry after resubscribing to an idle stream")
        .unwrap();
}

🟡 Medium Severity: probably best to fix before merging, because this leaves one of the PR's main cache/leak scenarios unresolved.

}
Err(broadcast::error::RecvError::Closed) => {
tracing::error!(
// The telemetry tracker upstream has shut down — a normal

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Bounds trackers can keep snapshot trackers alive after consumers drop

The bounds tracker only checks pool_bounds_tx.receiver_count() after pool_status_rx.recv().await yields a new snapshot. If the caller drops the last bounds receiver after the last status change, and the pool status then stays unchanged, this task remains blocked in recv(). Its pool_status_rx still counts as a receiver of the pool snapshot stream, so the pool telemetry tracker sees receiver_count() > 0 and keeps running instead of shutting down.

This also affects resubscription: power_bounds() will reuse the leaked snapshot sender because that old bounds tracker is still subscribed, then the new bounds tracker subscribes to a broadcast channel that only delivers future snapshots. If the status remains stable and the pool tracker keeps taking the unchanged-skip path, the new bounds receiver may not get any value until some unrelated telemetry/status change occurs.

Impact

Dropping all power_bounds() consumers can still leak the bounds tracker, the pool telemetry tracker, and the component subscriptions. A later power_bounds() call can also hang without an initial bounds update in the stable-status case.

Related locations

  • pub(crate) async fn run(mut self) {
    loop {
    match self.pool_status_rx.recv().await {
    Ok(pool_status) => {
    let bounds = Self::compute_pool_bounds(&pool_status);
    if self.pool_bounds_tx.send(bounds).is_err() {
    tracing::debug!(
    "No receivers for {}/{} bounds tracker; shutting down.",
    InverterM::str_name(),
    BatteryM::str_name(),
    );
    break;
    }
    }
    Err(broadcast::error::RecvError::Lagged(n)) => {
    tracing::warn!(
    "{}/{} bounds tracker lagged by {n} pool status updates.",
    InverterM::str_name(),
    BatteryM::str_name(),
    );
    }
    Err(broadcast::error::RecvError::Closed) => {
    // The telemetry tracker upstream has shut down — a normal
    // teardown of the whole pool, not an error here.
    tracing::debug!(
    "Pool status channel closed; {}/{} bounds tracker shutting down.",
    InverterM::str_name(),
    BatteryM::str_name(),
    );
    break;
    : the battery bounds tracker has the same blocking recv() shape.
  • // The unchanged-skip below means a stable partition never
    // reaches `send()`, whose failure is otherwise the only
    // signal that every receiver has dropped. Check for that
    // here so the tracker still shuts down instead of leaking.
    if self.component_pool_status_tx.receiver_count() == 0 {
    break;
    }
    // Skip sending if the partitioning hasn't changed.
    let unchanged = last_sent.as_ref().is_some_and(|s| {
    s.healthy_inverters == healthy_inverters
    && s.unhealthy_inverters == unhealthy_inverters
    });
    if unchanged {
    continue;
    : unchanged PV snapshots are skipped, so a leaked bounds tracker may never be woken to notice its own receivers are gone.
  • // The unchanged-skip below means a stable partition never
    // reaches `send()`, whose failure is otherwise the only
    // signal that every receiver has dropped. Check for that
    // here so the tracker still shuts down instead of leaking.
    if self.component_pool_status_tx.receiver_count() == 0 {
    break;
    }
    if last_sent_status.as_ref() == Some(&inverter_battery_group_data) {
    continue; // Skip sending if the status hasn't changed
    : the battery pool has the same unchanged-skip path.

Suggested fix

Give both bounds trackers their own shutdown check that does not depend on receiving another pool snapshot. A minimal fix is to use tokio::select! with a periodic tick and break when pool_bounds_tx.receiver_count() == 0; dropping pool_status_rx will then let the pool tracker see that it has no remaining consumers and shut down or be recreated cleanly.

Suggested tests

Add PV and battery tests that consume a bounds update, let the components reach a stable unchanged state, drop the bounds receiver, advance simulated time, and then call power_bounds() again. The new receiver should get a fresh update instead of timing out, and the old tracker should not keep the snapshot sender alive.


🟡 Medium Severity: probably best to fix before merging, because this preserves one of the task leak/stalled-resubscription paths the PR is meant to eliminate.

}

pub async fn run(self) -> Result<(), Error> {
pub async fn run(self) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Public tracker run() no longer returns Result

PvPoolTelemetryTracker and BatteryPoolTelemetryTracker are public and re-exported from the crate root, and this changes their public run() signature from pub async fn run(self) -> Result<(), Error> to pub async fn run(self). Downstream code that awaits the tracker and propagates or matches the result will stop compiling. The release notes currently list only bug fixes and do not call this out as a breaking API change.

Impact

This is a source-incompatible public API change. It also removes the only programmatic way for external callers to observe startup failures from run(), even though normal shutdown can still be treated as success.

Related locations

  • : BatteryPoolTelemetryTracker::run() has the same signature change.
  • pub use microgrid::{
    BatteryPool, BatteryPoolSnapshot, BatteryPoolTelemetryTracker, InverterBatteryGroup,
    InverterBatteryGroupStatus, Microgrid, PvPool, PvPoolSnapshot, PvPoolTelemetryTracker,
    };
    : both tracker types are re-exported from the crate root.
  • ## Bug Fixes
    - The pool, group, and component telemetry trackers no longer leak their tasks (while logging at error level every tick) once their consumers are gone; normal shutdown is now logged at debug.
    - The client now evicts ended per-component telemetry streams from its cache, so a pool recreated on the same client receives telemetry again instead of silently getting none.
    : the release notes do not mention a breaking change.

Suggested fix

If the signature change is not intended, keep Result<(), Error> and return Ok(()) for normal shutdown while preserving Err(...) for startup/configuration failures. If it is intended, add a breaking-change release-note entry and consider applying the existing scope:breaking-change label.

Suggested tests

Add a public API/semver check or compile test that catches return-type changes on exported tracker methods before release.


🟡 Medium Severity: probably best to fix or explicitly document before merging, because this is a public source-compatibility break.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

2 participants