Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,9 @@
- `BatteryPool::telemetry_snapshots()` exposes the same per-component health partition (`BatteryPoolSnapshot`).

- The pool telemetry trackers and snapshot types (`BatteryPoolSnapshot`, `PvPoolSnapshot`, `InverterBatteryGroup`, `InverterBatteryGroupStatus`) are now public and re-exported from the crate root.

## Bug Fixes

- The pool, group, and component telemetry trackers no longer leak their tasks (while logging at error level every tick) once their consumers are gone; normal shutdown is now logged at debug.

- The client now evicts ended per-component telemetry streams from its cache, so a pool recreated on the same client receives telemetry again instead of silently getting none.
23 changes: 23 additions & 0 deletions src/client/microgrid_client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,29 @@ impl<T: MicrogridApiClient> MicrogridClientActor<T> {
}
Some(StreamStatus::Ended(component_id)) => {
components_to_retry.remove(&component_id);
match component_streams.get(&component_id) {
// A subscriber arrived between the stream
// task's no-receivers check and this message;
// the task is gone, so restart the stream for
// the new subscriber.
Some(tx) if tx.receiver_count() > 0 => {
let tx = tx.clone();
start_electrical_component_telemetry_stream(
&mut self.client,
component_id,
tx,
stream_status_tx.clone(),
)
.await;
}
// Drop the cached sender: nothing writes to it
// anymore, so handing out further subscriptions
// on it would never yield data. The next
// subscription starts a fresh stream instead.
_ => {
component_streams.remove(&component_id);
}
}
}
None => {
tracing::error!("MicrogridClientActor: Stream status channel closed, exiting.");
Expand Down
28 changes: 28 additions & 0 deletions src/client/microgrid_client_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,4 +405,32 @@ mod tests {
]
);
}

#[tokio::test(start_paused = true)]
async fn test_resubscribing_after_all_receivers_dropped_restarts_stream() {
let handle = new_client_handle();

let mut telemetry_rx = handle
.receive_electrical_component_telemetry_stream(2)
.await
.unwrap();
telemetry_rx.recv().await.unwrap();
drop(telemetry_rx);

// Let the stream task notice that all receivers are gone (it checks
// before each sample) and the actor process its Ended status, which
// must evict the cached sender.
tokio::time::sleep(std::time::Duration::from_secs(1)).await;

// A fresh subscription must get a live stream again, not a
// subscription on the dead cached channel.
let mut telemetry_rx = handle
.receive_electrical_component_telemetry_stream(2)
.await
.unwrap();
tokio::time::timeout(std::time::Duration::from_secs(10), telemetry_rx.recv())
.await
.expect("no telemetry after resubscribing; stale stream cache?")
.unwrap();
}
}
5 changes: 3 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ pub(crate) mod wall_clock_timer;

mod microgrid;
pub use microgrid::{
BatteryPool, BatteryPoolSnapshot, BatteryPoolTelemetryTracker, InverterBatteryGroup,
InverterBatteryGroupStatus, Microgrid, PvPool, PvPoolSnapshot, PvPoolTelemetryTracker,
BatteryPool, BatteryPoolSnapshot, BatteryPoolTelemetryTracker, ComponentHealthPartition,
InverterBatteryGroup, InverterBatteryGroupStatus, Microgrid, PvPool, PvPoolSnapshot,
PvPoolTelemetryTracker,
};

#[cfg(any(test, feature = "test-utils"))]
Expand Down
7 changes: 7 additions & 0 deletions src/microgrid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@
//! High-level interface for the Microgrid API.

mod bounds_aggregation;
mod pool_bounds_tracker;
mod pool_broadcast;
mod pool_validation;

#[cfg(test)]
mod test_support;

mod battery_bounds_tracker;
mod battery_pool;
Expand All @@ -17,6 +23,7 @@ pub(crate) mod telemetry_tracker;
pub use telemetry_tracker::battery_pool_telemetry_tracker::{
BatteryPoolSnapshot, BatteryPoolTelemetryTracker, InverterBatteryGroup,
};
pub use telemetry_tracker::component_partition::ComponentHealthPartition;
pub use telemetry_tracker::inverter_battery_group_telemetry_tracker::InverterBatteryGroupStatus;
pub use telemetry_tracker::pv_pool_telemetry_tracker::{PvPoolSnapshot, PvPoolTelemetryTracker};

Expand Down
Loading
Loading