Skip to content

Deduplicate shared pool code into reusable helpers#51

Draft
shsms wants to merge 12 commits into
frequenz-floss:v0.x.xfrom
shsms:pool-dedup
Draft

Deduplicate shared pool code into reusable helpers#51
shsms wants to merge 12 commits into
frequenz-floss:v0.x.xfrom
shsms:pool-dedup

Conversation

@shsms

@shsms shsms commented Jun 17, 2026

Copy link
Copy Markdown
Collaborator

The Battery and PV pool implementations carried large amounts of byte-for-byte
duplicated logic. This branch factors the shared parts into single
implementations, with no intended behavior change.

Changes

  • Extract try_reuse for the broadcast weak-sender reuse gate that
    power_bounds/telemetry_snapshots repeated on both pools.
  • Replace the identical PvPoolBoundsTracker/BatteryPoolBoundsTracker run
    loops with one generic PoolBoundsTracker<S, Q> taking the per-pool
    aggregation as a fn(&S) -> Vec<Bounds<Q>>.
  • Introduce ComponentHealthPartition and rebuild PvPoolSnapshot and
    InverterBatteryGroupStatus on it. Public API: new crate-root export;
    the two snapshot types' fields are reshaped to carry it.
  • Extract validate_pool_ids for the shared component-ID checks.
  • Collect duplicated test helpers into a shared test_support module.

shsms added 12 commits June 11, 2026 09:21
`PvPoolBoundsTracker` and `BatteryPoolBoundsTracker` carried byte-for-byte
identical `run` loops — receive a snapshot, compute its bounds, broadcast them,
and handle lag/closure — differing only in the per-pool aggregation and the log
label. Replace both with a single generic `PoolBoundsTracker<S, Q>` that takes
the aggregation as a plain `fn(&S) -> Vec<Bounds<Q>>` and a label string.

The pool-specific aggregation stays in `pv_bounds_tracker` and
`battery_bounds_tracker` as `compute_pool_bounds` free functions (with their
existing tests), now passed into the shared tracker.

Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
`telem_with_power_bounds`, `handles`, and the `last_snapshot` broadcast drainer
were copy-pasted verbatim across the pool and bounds-tracker test modules.
Collect them into a shared `#[cfg(test)] test_support` module — `last_snapshot`
generic over the snapshot type — and have the test modules (and `new_pool`) use
them.

Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
The run loop logged "Failed to send component status" on every sample and
every missing-data tick once the pool tracker dropped its mpsc receiver, but
never exited, leaking the task and its broadcast subscription for the life of
the process (scaling with each pool recreation). Break the loop on send
failure, matching the RecvError::Closed arm, so the tracker shuts down when
there is nothing left to report to.

Fixes frequenz-floss#43.

Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
`RecvError::Closed` means the upstream telemetry tracker dropped its sender,
which happens on a normal teardown of the pool — the bounds tracker has nothing
left to aggregate. Log it at debug rather than error, matching the
no-receivers path just above.

Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
The tick arm skips `send()` whenever the partitioning is unchanged, but a failed
`send()` is the only thing that tells the tracker every receiver has gone. With
a stable partition (e.g. all components silent, each re-emitting the same
unhealthy status) the snapshot never changes, so the skip path is taken every
tick and the dropped receivers are never noticed — the tracker and its
component trackers leak until process exit.

Check `receiver_count()` each tick, before the unchanged-skip, and break when it
reaches zero. Applies to both the PV and battery pool telemetry trackers.

Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Both pool telemetry trackers now treat every exit from `run` as a normal
shutdown: the loop ends when every component tracker has gone or every receiver
has dropped, neither of which is an error. The trailing `error!` + `Err` return
became a `debug!`, and the in-loop send-failure no longer logs at error.

With shutdown no longer fallible, `run` has nothing left to report, so drop its
`Result` return (the call sites only `tokio::spawn` it and discard the result).
The remaining startup failures -- an empty component set and opening a
component's telemetry stream -- now log at their source before the task exits,
instead of being propagated into a discarded `JoinHandle`.

Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
The group tracker logged at error and returned `Err` on three paths that are
all ordinary shutdown: its inverter or battery component trackers all exiting
(`recv` yields `None`), and the pool tracker dropping its receiver (the send to
it fails). The last one fired on every `BatteryPool` teardown, so each group
tracker spammed "Failed to send inverter-battery group status" at error level.

Log these at debug and just return, matching how the pool telemetry trackers
already treat their own shutdown. With shutdown no longer an error, `run` has
nothing left to fail with, so drop its `Result` return; the remaining startup
failures -- opening a component's telemetry stream -- now log at their source
before the task exits, instead of being propagated into a discarded
`JoinHandle`.

Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
The per-component stream task exits once it sees no receivers and reports
`StreamStatus::Ended`, but the actor only removed the component from the
retry map — the now-dead `broadcast::Sender` stayed cached in
`component_streams`. Any later subscription for that component was handed
`tx.subscribe()` on that dead sender without a new tonic stream being
started, so it never received telemetry. With the telemetry trackers now
exiting cleanly instead of leaking (which used to hold receivers forever
and mask this), every drop-pool/rebuild cycle hit it: the rebuilt pool saw
its components as permanently silent.

On `Ended`, drop the cached sender so the next subscription starts a fresh
stream. If a subscriber arrived in the window between the stream task's
no-receivers check and the actor processing `Ended`, the entry still has
receivers — restart the stream for them instead of evicting it, since the
old task is already gone either way.

Fixes frequenz-floss#42.

Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
`PvPoolSnapshot` and `InverterBatteryGroupStatus` each carried their own pair(s)
of `healthy`/`unhealthy` telemetry maps, and both telemetry trackers
open-coded the same insert-here / remove-there bookkeeping on every status
update.

Introduce `ComponentHealthPartition` — a `{ healthy, unhealthy }` pair with
`mark_healthy`/`mark_unhealthy` helpers — and rebuild both snapshot types on it
(`PvPoolSnapshot { inverters }`, `InverterBatteryGroupStatus { inverters,
batteries }`). The trackers now mutate partitions through the shared helpers,
and the PV tracker's unchanged-check compares whole partitions rather than
field by field, so a future field can't silently escape change detection.

Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
`BatteryPool::try_new` and `PvPool::try_new` carried the same two checks —
reject an empty explicit set and reject IDs that aren't all of the right kind —
differing only in the component kind and the noun used in error messages.

Extract them into `validate_pool_ids`, parameterised by the matching-ID set and
a noun, and call it from both pools.

Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
`power_bounds` and `telemetry_snapshots` on both `BatteryPool` and `PvPool`
repeated the same weak-sender reuse gate — upgrade the stored `WeakSender`,
keep it only if it still has receivers, and subscribe — in four places. Pull it
into a single `try_reuse` helper so the reuse policy lives in one spot.

Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant