Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,9 @@
- `BatteryPool::telemetry_snapshots()` exposes the same per-component health partition (`BatteryPoolSnapshot`).

- The pool telemetry trackers and snapshot types (`BatteryPoolSnapshot`, `PvPoolSnapshot`, `InverterBatteryGroup`, `InverterBatteryGroupStatus`) are now public and re-exported from the crate root.

## Bug Fixes

- The pool, group, and component telemetry trackers no longer leak their tasks (while logging at error level every tick) once their consumers are gone; normal shutdown is now logged at debug.

- The client now evicts ended per-component telemetry streams from its cache, so a pool recreated on the same client receives telemetry again instead of silently getting none.
23 changes: 23 additions & 0 deletions src/client/microgrid_client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,29 @@ impl<T: MicrogridApiClient> MicrogridClientActor<T> {
}
Some(StreamStatus::Ended(component_id)) => {
components_to_retry.remove(&component_id);
match component_streams.get(&component_id) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Idle component streams never send Ended

The new eviction path only runs after the actor receives StreamStatus::Ended, but the stream task sends Ended only from the top of its loop, before it awaits the next tonic item. If the last receiver drops while the tonic stream is idle but still open, the task is already parked in stream.next().await; it never rechecks tx.receiver_count(), never sends Ended, and component_streams keeps the stale sender. A later subscription then takes tx.subscribe() on that stale sender instead of creating a fresh API stream.

The new test exercises a stream that still has another sample after the receiver is dropped, so the task wakes up and reaches the receiver-count check. It does not cover the idle-open stream case.

Impact

A pool recreated after its component streams have gone silent can still get a cached stream that never yields fresh telemetry. The old per-component stream task and API stream can also remain alive indefinitely, so the task/cache leak is not fully fixed.

Related locations

  • loop {
    if tx.receiver_count() == 0 {
    tracing::debug!(
    "Dropping ComponentData stream for component_id:{:?}",
    electrical_component_id
    );
    stream_status_tx
    .send(StreamStatus::Ended(electrical_component_id))
    .await
    .unwrap_or_else(|e| {
    tracing::error!(
    "Failed to send stream ended message for {:?}: {:?}",
    electrical_component_id,
    e
    );
    });
    return;
    }
    let message = match stream.next().await {
    : Ended is only sent before awaiting the next stream item.
  • // Let the stream task notice that all receivers are gone (it checks
    // before each sample) and the actor process its Ended status, which
    // must evict the cached sender.
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    : the new test relies on there being another sample/tick after the receiver is dropped.

Suggested fix

Make the stream task wake independently of incoming telemetry when there are no receivers. For example, wrap stream.next() in a tokio::select! with a small interval that checks tx.receiver_count() == 0 and sends StreamStatus::Ended, or introduce an explicit cancellation path tied to receiver lifetime.

Suggested tests

Add a resubscribe test where the mock stream stays open but idle after its last sample, drop the receiver after the stream task is parked on stream.next(), and verify that a later subscription starts a fresh API stream and receives telemetry.

Suggested new tests
#[tokio::test(start_paused = true)]
async fn test_resubscribing_after_idle_stream_restarts_stream() {
    let api_client = MockMicrogridApiClient::new(
        MockComponent::grid(1).with_children(vec![
            MockComponent::meter(2)
                .with_power(vec![4.0])
                .with_silence_after_metrics(),
        ]),
    );
    let handle = MicrogridClientHandle::new_from_client(api_client);

    let mut telemetry_rx = handle
        .receive_electrical_component_telemetry_stream(2)
        .await
        .unwrap();
    telemetry_rx.recv().await.unwrap();

    tokio::task::yield_now().await;
    drop(telemetry_rx);
    tokio::time::advance(std::time::Duration::from_secs(2)).await;

    let mut telemetry_rx = handle
        .receive_electrical_component_telemetry_stream(2)
        .await
        .unwrap();
    tokio::time::timeout(std::time::Duration::from_secs(1), telemetry_rx.recv())
        .await
        .expect("no telemetry after resubscribing to an idle stream")
        .unwrap();
}

🟡 Medium Severity: probably best to fix before merging, because this leaves one of the PR's main cache/leak scenarios unresolved.

// A subscriber arrived between the stream
// task's no-receivers check and this message;
// the task is gone, so restart the stream for
// the new subscriber.
Some(tx) if tx.receiver_count() > 0 => {
let tx = tx.clone();
start_electrical_component_telemetry_stream(
&mut self.client,
component_id,
tx,
stream_status_tx.clone(),
)
.await;
}
// Drop the cached sender: nothing writes to it
// anymore, so handing out further subscriptions
// on it would never yield data. The next
// subscription starts a fresh stream instead.
_ => {
component_streams.remove(&component_id);
}
}
}
None => {
tracing::error!("MicrogridClientActor: Stream status channel closed, exiting.");
Expand Down
28 changes: 28 additions & 0 deletions src/client/microgrid_client_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,4 +405,32 @@ mod tests {
]
);
}

#[tokio::test(start_paused = true)]
async fn test_resubscribing_after_all_receivers_dropped_restarts_stream() {
let handle = new_client_handle();

let mut telemetry_rx = handle
.receive_electrical_component_telemetry_stream(2)
.await
.unwrap();
telemetry_rx.recv().await.unwrap();
drop(telemetry_rx);

// Let the stream task notice that all receivers are gone (it checks
// before each sample) and the actor process its Ended status, which
// must evict the cached sender.
tokio::time::sleep(std::time::Duration::from_secs(1)).await;

// A fresh subscription must get a live stream again, not a
// subscription on the dead cached channel.
let mut telemetry_rx = handle
.receive_electrical_component_telemetry_stream(2)
.await
.unwrap();
tokio::time::timeout(std::time::Duration::from_secs(10), telemetry_rx.recv())
.await
.expect("no telemetry after resubscribing; stale stream cache?")
.unwrap();
}
}
4 changes: 3 additions & 1 deletion src/microgrid/battery_bounds_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ where
);
}
Err(broadcast::error::RecvError::Closed) => {
tracing::error!(
// The telemetry tracker upstream has shut down — a normal
// teardown of the whole pool, not an error here.
tracing::debug!(
"Pool status channel closed; {}/{} bounds tracker shutting down.",
InverterM::str_name(),
BatteryM::str_name(),
Expand Down
4 changes: 3 additions & 1 deletion src/microgrid/pv_bounds_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ where
);
}
Err(broadcast::error::RecvError::Closed) => {
tracing::error!(
// The telemetry tracker upstream has shut down — a normal

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Bounds trackers can keep snapshot trackers alive after consumers drop

The bounds tracker only checks pool_bounds_tx.receiver_count() after pool_status_rx.recv().await yields a new snapshot. If the caller drops the last bounds receiver after the last status change, and the pool status then stays unchanged, this task remains blocked in recv(). Its pool_status_rx still counts as a receiver of the pool snapshot stream, so the pool telemetry tracker sees receiver_count() > 0 and keeps running instead of shutting down.

This also affects resubscription: power_bounds() will reuse the leaked snapshot sender because that old bounds tracker is still subscribed, then the new bounds tracker subscribes to a broadcast channel that only delivers future snapshots. If the status remains stable and the pool tracker keeps taking the unchanged-skip path, the new bounds receiver may not get any value until some unrelated telemetry/status change occurs.

Impact

Dropping all power_bounds() consumers can still leak the bounds tracker, the pool telemetry tracker, and the component subscriptions. A later power_bounds() call can also hang without an initial bounds update in the stable-status case.

Related locations

  • pub(crate) async fn run(mut self) {
    loop {
    match self.pool_status_rx.recv().await {
    Ok(pool_status) => {
    let bounds = Self::compute_pool_bounds(&pool_status);
    if self.pool_bounds_tx.send(bounds).is_err() {
    tracing::debug!(
    "No receivers for {}/{} bounds tracker; shutting down.",
    InverterM::str_name(),
    BatteryM::str_name(),
    );
    break;
    }
    }
    Err(broadcast::error::RecvError::Lagged(n)) => {
    tracing::warn!(
    "{}/{} bounds tracker lagged by {n} pool status updates.",
    InverterM::str_name(),
    BatteryM::str_name(),
    );
    }
    Err(broadcast::error::RecvError::Closed) => {
    // The telemetry tracker upstream has shut down — a normal
    // teardown of the whole pool, not an error here.
    tracing::debug!(
    "Pool status channel closed; {}/{} bounds tracker shutting down.",
    InverterM::str_name(),
    BatteryM::str_name(),
    );
    break;
    : the battery bounds tracker has the same blocking recv() shape.
  • // The unchanged-skip below means a stable partition never
    // reaches `send()`, whose failure is otherwise the only
    // signal that every receiver has dropped. Check for that
    // here so the tracker still shuts down instead of leaking.
    if self.component_pool_status_tx.receiver_count() == 0 {
    break;
    }
    // Skip sending if the partitioning hasn't changed.
    let unchanged = last_sent.as_ref().is_some_and(|s| {
    s.healthy_inverters == healthy_inverters
    && s.unhealthy_inverters == unhealthy_inverters
    });
    if unchanged {
    continue;
    : unchanged PV snapshots are skipped, so a leaked bounds tracker may never be woken to notice its own receivers are gone.
  • // The unchanged-skip below means a stable partition never
    // reaches `send()`, whose failure is otherwise the only
    // signal that every receiver has dropped. Check for that
    // here so the tracker still shuts down instead of leaking.
    if self.component_pool_status_tx.receiver_count() == 0 {
    break;
    }
    if last_sent_status.as_ref() == Some(&inverter_battery_group_data) {
    continue; // Skip sending if the status hasn't changed
    : the battery pool has the same unchanged-skip path.

Suggested fix

Give both bounds trackers their own shutdown check that does not depend on receiving another pool snapshot. A minimal fix is to use tokio::select! with a periodic tick and break when pool_bounds_tx.receiver_count() == 0; dropping pool_status_rx will then let the pool tracker see that it has no remaining consumers and shut down or be recreated cleanly.

Suggested tests

Add PV and battery tests that consume a bounds update, let the components reach a stable unchanged state, drop the bounds receiver, advance simulated time, and then call power_bounds() again. The new receiver should get a fresh update instead of timing out, and the old tracker should not keep the snapshot sender alive.


🟡 Medium Severity: probably best to fix before merging, because this preserves one of the task leak/stalled-resubscription paths the PR is meant to eliminate.

// teardown of the whole pool, not an error here.
tracing::debug!(
"Pool status channel closed; {} PV bounds tracker shutting down.",
M::str_name(),
);
Expand Down
63 changes: 45 additions & 18 deletions src/microgrid/telemetry_tracker/battery_pool_telemetry_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,11 @@ impl BatteryPoolTelemetryTracker {

while let Some(battery_id) = unvisited_batteries.iter().next().cloned() {
let group_inverters = graph
.predecessors(battery_id)?
.predecessors(battery_id)
.map_err(|e| {
tracing::error!("Failed to query predecessors of battery {battery_id}: {e}");
e
})?
.filter(|c| c.category() == crate::client::ElectricalComponentCategory::Inverter)
.map(|c| c.id)
.collect::<BTreeSet<_>>();
Expand All @@ -102,7 +106,13 @@ impl BatteryPoolTelemetryTracker {
let mut group_batteries = BTreeSet::new();
for inverter_id in &group_inverters {
let connected_batteries = graph
.successors(*inverter_id)?
.successors(*inverter_id)
.map_err(|e| {
tracing::error!(
"Failed to query successors of inverter {inverter_id}: {e}"
);
e
})?
.map(|c| c.id)
.collect::<BTreeSet<_>>();

Expand All @@ -129,7 +139,13 @@ impl BatteryPoolTelemetryTracker {
// Ensure that group batteries are only connect to group inverters
for battery_id in &group_batteries {
let connected_inverters = graph
.predecessors(*battery_id)?
.predecessors(*battery_id)
.map_err(|e| {
tracing::error!(
"Failed to query predecessors of battery {battery_id}: {e}"
);
e
})?
.filter(|c| {
c.category() == crate::client::ElectricalComponentCategory::Inverter
})
Expand All @@ -152,10 +168,13 @@ impl BatteryPoolTelemetryTracker {
Ok(groups)
}

pub async fn run(self) -> Result<(), Error> {
pub async fn run(self) {
let mut inverter_battery_group_data = HashMap::new();

let inverter_battery_group_ids = self.get_inverter_battery_groups()?;
// Errors are logged at source inside `get_inverter_battery_groups`.
let Ok(inverter_battery_group_ids) = self.get_inverter_battery_groups() else {
return;
};

let (component_status_tx, mut component_status_rx) = tokio::sync::mpsc::channel(100);
for inverter_battery_group in inverter_battery_group_ids {
Expand Down Expand Up @@ -185,36 +204,44 @@ impl BatteryPoolTelemetryTracker {
inverter_battery_group_data.insert(group_ids, status);
}
// Every group tracker has exited and dropped its sender,
// so no further updates will arrive. The `interval.tick()`
// arm never disables, so the `select!` `else` can never
// run; break here instead.
// so no further updates will ever arrive. The `_ =
// interval.tick()` arm below is a catch-all that never
// disables, so the `select!` `else` branch can never run;
// break here instead.
None => break,
}
},
_ = interval.tick() => {
// The unchanged-skip below means a stable partition never
// reaches `send()`, whose failure is otherwise the only
// signal that every receiver has dropped. Check for that
// here so the tracker still shuts down instead of leaking.
if self.component_pool_status_tx.receiver_count() == 0 {
break;
}
if last_sent_status.as_ref() == Some(&inverter_battery_group_data) {
continue; // Skip sending if the status hasn't changed
}
if let Err(e) = self.component_pool_status_tx.send(
BatteryPoolSnapshot(inverter_battery_group_data.clone())
)
if self
.component_pool_status_tx
.send(BatteryPoolSnapshot(inverter_battery_group_data.clone()))
.is_err()
{
tracing::error!("Failed to send pool snapshot: {}", e);
// All receivers dropped between the check above and here;
// a normal shutdown, recorded by the terminal log below.
break;
}
last_sent_status = Some(inverter_battery_group_data.clone());
},
}
}

let err = format!(
"BatteryPoolTelemetryTracker (component IDs {:?}) stopped receiving group telemetry updates.",
// Reaching here means every group tracker exited or every receiver
// dropped — a normal shutdown, not an error.
tracing::debug!(
"BatteryPoolTelemetryTracker (component IDs {:?}) stopped: all group trackers or receivers are gone.",
self.component_ids
);

tracing::error!("{}", err);

Err(Error::component_data_error(err))
}
}

Expand Down
24 changes: 20 additions & 4 deletions src/microgrid/telemetry_tracker/component_telemetry_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,25 @@ impl ComponentTelemetryTracker {
// Reset the interval timer on receiving valid data
interval.reset();
let status = self.state_from_data(data);
if let Err(e) = self.component_status_tx.send(status).await {
tracing::error!("Failed to send component status: {}", e);
if self.component_status_tx.send(status).await.is_err() {
// The pool tracker dropped its receiver; there is
// nothing left to report to, so stop tracking
// instead of looping and logging forever.
tracing::debug!(
"Component {} telemetry tracker stopping: pool tracker dropped its receiver.",
self.component_id
);
break;
}
}
Err(broadcast::error::RecvError::Lagged(_)) => {
continue;
}
Err(broadcast::error::RecvError::Closed) => {
tracing::debug!(
"Component {} telemetry tracker stopping: telemetry stream closed.",
self.component_id
);
drop(self.component_status_tx);
break;
}
Expand All @@ -95,8 +106,13 @@ impl ComponentTelemetryTracker {
_ = interval.tick() => {
// If we reach here, it means no data was received within the tolerance period
let status = ComponentHealthStatus::Unhealthy(self.component_id, None);
if let Err(e) = self.component_status_tx.send(status).await {
tracing::error!("Failed to send component status: {}", e);
if self.component_status_tx.send(status).await.is_err() {
// The pool tracker dropped its receiver; stop tracking.
tracing::debug!(
"Component {} telemetry tracker stopping: pool tracker dropped its receiver.",
self.component_id
);
break;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::{
use tokio::select;

use crate::{
Error, MicrogridClientHandle,
MicrogridClientHandle,
client::proto::common::microgrid::electrical_components::{
ElectricalComponentStateCode, ElectricalComponentTelemetry,
},
Expand Down Expand Up @@ -76,7 +76,7 @@ impl InverterBatteryGroupTelemetryTracker {
}
}

pub async fn run(self) -> Result<(), Error> {
pub async fn run(self) {
let mut healthy_inverters = HashMap::new();
let mut unhealthy_inverters = HashMap::new();
let mut healthy_batteries = HashMap::new();
Expand All @@ -85,10 +85,19 @@ impl InverterBatteryGroupTelemetryTracker {
let (inverter_status_tx, mut inverter_status_rx) = tokio::sync::mpsc::channel(100);

for &inverter_id in &self.inverter_battery_group.inverter_ids {
let component_data_stream = self
let component_data_stream = match self
.client
.receive_electrical_component_telemetry_stream(inverter_id)
.await?;
.await
{
Ok(stream) => stream,
Err(e) => {
tracing::error!(
"Internal error opening telemetry stream for inverter {inverter_id}: {e}; inverter-battery group tracker aborting.",
);
return;
}
};
let tracker = ComponentTelemetryTracker::new(
inverter_id,
self.missing_data_tolerance,
Expand All @@ -107,10 +116,19 @@ impl InverterBatteryGroupTelemetryTracker {
let (battery_status_tx, mut battery_status_rx) = tokio::sync::mpsc::channel(100);

for &battery_id in &self.inverter_battery_group.battery_ids {
let component_data_stream = self
let component_data_stream = match self
.client
.receive_electrical_component_telemetry_stream(battery_id)
.await?;
.await
{
Ok(stream) => stream,
Err(e) => {
tracing::error!(
"Internal error opening telemetry stream for battery {battery_id}: {e}; inverter-battery group tracker aborting.",
);
return;
}
};
let tracker = ComponentTelemetryTracker::new(
battery_id,
self.missing_data_tolerance,
Expand All @@ -136,9 +154,13 @@ impl InverterBatteryGroupTelemetryTracker {
select! {
inverter_status = inverter_status_rx.recv() => {
let Some(inverter_status) = inverter_status else {
let e = String::from("Inverter telemetry tracker stopped receiving status updates.");
tracing::error!("{}", e);
return Err(Error::component_data_error(e));
// Every inverter component tracker has exited and dropped
// its sender — a normal shutdown, not an error.
tracing::debug!(
"Inverter-battery group tracker (inverters {:?}) stopping: all inverter component trackers have exited.",
self.inverter_battery_group.inverter_ids
);
return;
};
match inverter_status {
ComponentHealthStatus::Healthy(component_id, data) => {
Expand All @@ -152,12 +174,14 @@ impl InverterBatteryGroupTelemetryTracker {
}
},
battery_status = battery_status_rx.recv() => {
let Some(battery_status) = battery_status else {
let e = String::from(
"Battery telemetry tracker stopped receiving status updates."
let Some(battery_status) = battery_status else {
// Every battery component tracker has exited and dropped
// its sender — a normal shutdown, not an error.
tracing::debug!(
"Inverter-battery group tracker (batteries {:?}) stopping: all battery component trackers have exited.",
self.inverter_battery_group.battery_ids
);
tracing::error!("{}", e);
return Err(Error::component_data_error(e));
return;
};
match battery_status {
ComponentHealthStatus::Healthy(component_id, data) => {
Expand All @@ -171,7 +195,7 @@ impl InverterBatteryGroupTelemetryTracker {
}
}
}
if let Err(e) = self
if self
.status_tx
.send((
self.inverter_battery_group.clone(),
Expand All @@ -183,12 +207,14 @@ impl InverterBatteryGroupTelemetryTracker {
},
))
.await
.is_err()
{
tracing::error!("Failed to send inverter-battery group status: {}", e);
return Err(Error::component_data_error(format!(
"Failed to send inverter-battery group status: {}",
e
)));
// The pool tracker dropped its receiver — a normal shutdown.
tracing::debug!(
"Inverter-battery group tracker {:?} stopping: the pool tracker dropped its receiver.",
self.inverter_battery_group
);
return;
}
}
}
Expand Down
Loading
Loading