-
Notifications
You must be signed in to change notification settings - Fork 5
Fix telemetry and bounds tracker task leaks when consumers drop #50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: v0.x.x
Are you sure you want to change the base?
Changes from all commits
242b90d
649cbc9
7c68309
3c75f24
951c6ef
2c246a9
606e4e1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -65,7 +65,9 @@ where | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Err(broadcast::error::RecvError::Closed) => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| tracing::error!( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // The telemetry tracker upstream has shut down — a normal | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 Bounds trackers can keep snapshot trackers alive after consumers drop The bounds tracker only checks This also affects resubscription: ImpactDropping all Related locations
Suggested fixGive both bounds trackers their own shutdown check that does not depend on receiving another pool snapshot. A minimal fix is to use Suggested testsAdd PV and battery tests that consume a bounds update, let the components reach a stable unchanged state, drop the bounds receiver, advance simulated time, and then call 🟡 Medium Severity: probably best to fix before merging, because this preserves one of the task leak/stalled-resubscription paths the PR is meant to eliminate. |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // teardown of the whole pool, not an error here. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| tracing::debug!( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "Pool status channel closed; {} PV bounds tracker shutting down.", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| M::str_name(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟡 Idle component streams never send
EndedThe new eviction path only runs after the actor receives
StreamStatus::Ended, but the stream task sendsEndedonly from the top of its loop, before it awaits the next tonic item. If the last receiver drops while the tonic stream is idle but still open, the task is already parked instream.next().await; it never recheckstx.receiver_count(), never sendsEnded, andcomponent_streamskeeps the stale sender. A later subscription then takestx.subscribe()on that stale sender instead of creating a fresh API stream.The new test exercises a stream that still has another sample after the receiver is dropped, so the task wakes up and reaches the receiver-count check. It does not cover the idle-open stream case.
Impact
A pool recreated after its component streams have gone silent can still get a cached stream that never yields fresh telemetry. The old per-component stream task and API stream can also remain alive indefinitely, so the task/cache leak is not fully fixed.
Related locations
frequenz-microgrid-rs/src/client/microgrid_client_actor.rs
Lines 355 to 373 in 606e4e1
Endedis only sent before awaiting the next stream item.frequenz-microgrid-rs/src/client/microgrid_client_handle.rs
Lines 420 to 423 in 606e4e1
Suggested fix
Make the stream task wake independently of incoming telemetry when there are no receivers. For example, wrap
stream.next()in atokio::select!with a small interval that checkstx.receiver_count() == 0and sendsStreamStatus::Ended, or introduce an explicit cancellation path tied to receiver lifetime.Suggested tests
Add a resubscribe test where the mock stream stays open but idle after its last sample, drop the receiver after the stream task is parked on
stream.next(), and verify that a later subscription starts a fresh API stream and receives telemetry.Suggested new tests
🟡 Medium Severity: probably best to fix before merging, because this leaves one of the PR's main cache/leak scenarios unresolved.