Skip to content

Commit 770d768

Browse files
Extract abstract base classes from pools and pool ref stores (#1391)
This is preparation for the introduction of a new subclass for steam boiler pools. There was a lot of duplicated code between the three existing types of pools and their respective references stores, with just a bit of drift.
2 parents a84806b + dc9a076 commit 770d768

18 files changed

Lines changed: 496 additions & 638 deletions

examples/battery_pool.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,7 @@ async def main() -> None:
3030
receivers = [
3131
battery_pool.soc.new_receiver(limit=1),
3232
battery_pool.capacity.new_receiver(limit=1),
33-
# pylint: disable-next=protected-access
34-
battery_pool._system_power_bounds.new_receiver(limit=1),
33+
battery_pool.system_power_bounds.new_receiver(limit=1),
3534
]
3635

3736
async for metric in merge(*receivers):

src/frequenz/sdk/microgrid/_data_pipeline.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,7 @@ def new_battery_pool(
437437
self._battery_power_wrapper.distribution_results_fetcher()
438438
),
439439
min_update_interval=self._resampler_config.resampling_period,
440-
batteries_id=component_ids,
440+
component_ids=component_ids,
441441
)
442442
)
443443

src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -149,20 +149,17 @@ def _add_system_bounds_tracker(self, component_ids: frozenset[ComponentId]) -> N
149149
battery_pool = _data_pipeline.new_battery_pool(
150150
priority=-sys.maxsize - 1, component_ids=component_ids
151151
)
152-
# pylint: disable-next=protected-access
153-
bounds_receiver = battery_pool._system_power_bounds.new_receiver()
152+
bounds_receiver = battery_pool.system_power_bounds.new_receiver()
154153
elif issubclass(self._component_class, EvCharger):
155154
ev_charger_pool = _data_pipeline.new_ev_charger_pool(
156155
priority=-sys.maxsize - 1, component_ids=component_ids
157156
)
158-
# pylint: disable-next=protected-access
159-
bounds_receiver = ev_charger_pool._system_power_bounds.new_receiver()
157+
bounds_receiver = ev_charger_pool.system_power_bounds.new_receiver()
160158
elif issubclass(self._component_class, SolarInverter):
161159
pv_pool = _data_pipeline.new_pv_pool(
162160
priority=-sys.maxsize - 1, component_ids=component_ids
163161
)
164-
# pylint: disable-next=protected-access
165-
bounds_receiver = pv_pool._system_power_bounds.new_receiver()
162+
bounds_receiver = pv_pool.system_power_bounds.new_receiver()
166163
else:
167164
_logger.error(
168165
"PowerManagingActor: Unsupported component class: %s",

src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py

Lines changed: 29 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,17 @@
99
"""
1010

1111
import asyncio
12-
import uuid
13-
from collections import abc
1412

15-
from frequenz.client.common.microgrid.components import ComponentId
1613
from frequenz.quantities import Energy, Percentage, Power, Temperature
14+
from typing_extensions import override
1715

1816
from ... import timeseries
19-
from ..._internal._channels import MappingReceiverFetcher, ReceiverFetcher
20-
from ...microgrid import _power_distributing, _power_managing, connection_manager
17+
from ..._internal._channels import ReceiverFetcher
18+
from ...microgrid import _power_managing, connection_manager
2119
from ...timeseries import Sample
2220
from .._base_types import SystemBounds
23-
from ..formulas._formula import Formula
21+
from ..component_pool import ComponentPool
22+
from ..formulas import Formula
2423
from ._battery_pool_reference_store import BatteryPoolReferenceStore
2524
from ._methods import SendOnUpdate
2625
from ._metric_calculator import (
@@ -34,7 +33,7 @@
3433
# pylint: disable=protected-access
3534

3635

37-
class BatteryPool:
36+
class BatteryPool(ComponentPool[BatteryPoolReferenceStore, BatteryPoolReport]):
3837
"""An interface for interaction with pools of batteries.
3938
4039
Provides:
@@ -50,65 +49,6 @@ class BatteryPool:
5049
[propose_discharge][frequenz.sdk.timeseries.battery_pool.BatteryPool.propose_discharge].
5150
"""
5251

53-
def __init__(
54-
self,
55-
*,
56-
pool_ref_store: BatteryPoolReferenceStore,
57-
name: str | None,
58-
priority: int,
59-
):
60-
"""Create a BatteryPool instance.
61-
62-
!!! note
63-
`BatteryPool` instances are not meant to be created directly by users. Use
64-
the [`microgrid.new_battery_pool`][frequenz.sdk.microgrid.new_battery_pool]
65-
method for creating `BatteryPool` instances.
66-
67-
Args:
68-
pool_ref_store: The battery pool reference store instance.
69-
name: An optional name used to identify this instance of the pool or a
70-
corresponding actor in the logs.
71-
priority: The priority of the actor using this wrapper.
72-
"""
73-
self._pool_ref_store = pool_ref_store
74-
unique_id = str(uuid.uuid4())
75-
self._source_id = unique_id if name is None else f"{name}-{unique_id}"
76-
self._priority = priority
77-
78-
async def propose_power(
79-
self,
80-
power: Power | None,
81-
*,
82-
bounds: timeseries.Bounds[Power | None] = timeseries.Bounds(None, None),
83-
) -> None:
84-
"""Send a proposal to the power manager for the pool's set of batteries.
85-
86-
Power values need to follow the Passive Sign Convention (PSC). That is, positive
87-
values indicate charge power and negative values indicate discharge power.
88-
89-
Details on how the power manager handles proposals can be found in the
90-
[Microgrid][frequenz.sdk.microgrid--setting-power] documentation.
91-
92-
Args:
93-
power: The power to propose for the batteries in the pool. If `None`, this
94-
proposal will not have any effect on the target power, unless bounds are
95-
specified. When specified without bounds, bounds for lower priority
96-
actors will be shifted by this power. If both are `None`, it is
97-
equivalent to not having a proposal or withdrawing a previous one.
98-
bounds: The power bounds for the proposal. When specified, this will limit
99-
the bounds for lower priority actors.
100-
"""
101-
await self._pool_ref_store._power_manager_requests_sender.send(
102-
_power_managing.Proposal(
103-
source_id=self._source_id,
104-
preferred_power=power,
105-
bounds=bounds,
106-
component_ids=self._pool_ref_store._batteries,
107-
priority=self._priority,
108-
creation_time=asyncio.get_running_loop().time(),
109-
)
110-
)
111-
11252
async def propose_charge(self, power: Power | None) -> None:
11353
"""Set the given charge power for the batteries in the pool.
11454
@@ -133,12 +73,12 @@ async def propose_charge(self, power: Power | None) -> None:
13373
"""
13474
if power and power < Power.zero():
13575
raise ValueError("Charge power must be positive.")
136-
await self._pool_ref_store._power_manager_requests_sender.send(
76+
await self._pool_ref_store.power_manager_requests_sender.send(
13777
_power_managing.Proposal(
13878
source_id=self._source_id,
13979
preferred_power=power,
14080
bounds=timeseries.Bounds(None, None),
141-
component_ids=self._pool_ref_store._batteries,
81+
component_ids=self._pool_ref_store.component_ids,
14282
priority=self._priority,
14383
creation_time=asyncio.get_running_loop().time(),
14484
)
@@ -170,27 +110,19 @@ async def propose_discharge(self, power: Power | None) -> None:
170110
if power < Power.zero():
171111
raise ValueError("Discharge power must be positive.")
172112
power = -power
173-
await self._pool_ref_store._power_manager_requests_sender.send(
113+
await self._pool_ref_store.power_manager_requests_sender.send(
174114
_power_managing.Proposal(
175115
source_id=self._source_id,
176116
preferred_power=power,
177117
bounds=timeseries.Bounds(None, None),
178-
component_ids=self._pool_ref_store._batteries,
118+
component_ids=self._pool_ref_store.component_ids,
179119
priority=self._priority,
180120
creation_time=asyncio.get_running_loop().time(),
181121
)
182122
)
183123

184124
@property
185-
def component_ids(self) -> abc.Set[ComponentId]:
186-
"""Return ids of the batteries in the pool.
187-
188-
Returns:
189-
Ids of the batteries in the pool
190-
"""
191-
return self._pool_ref_store._batteries
192-
193-
@property
125+
@override
194126
def power(self) -> Formula[Power]:
195127
"""Fetch the total power of the batteries in the pool.
196128
@@ -206,10 +138,10 @@ def power(self) -> Formula[Power]:
206138
A Formula that will calculate and stream the total power of all
207139
batteries in the pool.
208140
"""
209-
return self._pool_ref_store._formula_pool.from_power_formula(
141+
return self._pool_ref_store.formula_pool.from_power_formula(
210142
"battery_pool_power",
211143
connection_manager.get().component_graph.battery_formula(
212-
self._pool_ref_store._batteries
144+
self._pool_ref_store.component_ids
213145
),
214146
)
215147

@@ -248,10 +180,12 @@ def soc(self) -> ReceiverFetcher[Sample[Percentage]]:
248180
batteries in the pool, considering only working batteries with
249181
operational inverters.
250182
"""
183+
assert isinstance(self._pool_ref_store, BatteryPoolReferenceStore)
184+
251185
method_name = SendOnUpdate.name() + "_" + SoCCalculator.name()
252186

253187
if method_name not in self._pool_ref_store._active_methods:
254-
calculator = SoCCalculator(self._pool_ref_store._batteries)
188+
calculator = SoCCalculator(self._pool_ref_store.component_ids)
255189
self._pool_ref_store._active_methods[method_name] = SendOnUpdate(
256190
metric_calculator=calculator,
257191
working_batteries=self._pool_ref_store._working_batteries,
@@ -268,9 +202,12 @@ def temperature(self) -> ReceiverFetcher[Sample[Temperature]]:
268202
A MetricAggregator that will calculate and stream the average temperature
269203
of all batteries in the pool.
270204
"""
205+
assert isinstance(self._pool_ref_store, BatteryPoolReferenceStore)
206+
271207
method_name = SendOnUpdate.name() + "_" + TemperatureCalculator.name()
208+
272209
if method_name not in self._pool_ref_store._active_methods:
273-
calculator = TemperatureCalculator(self._pool_ref_store._batteries)
210+
calculator = TemperatureCalculator(self._pool_ref_store.component_ids)
274211
self._pool_ref_store._active_methods[method_name] = SendOnUpdate(
275212
metric_calculator=calculator,
276213
working_batteries=self._pool_ref_store._working_batteries,
@@ -305,10 +242,12 @@ def capacity(self) -> ReceiverFetcher[Sample[Energy]]:
305242
batteries in the pool, considering only working batteries with
306243
operational inverters.
307244
"""
245+
assert isinstance(self._pool_ref_store, BatteryPoolReferenceStore)
246+
308247
method_name = SendOnUpdate.name() + "_" + CapacityCalculator.name()
309248

310249
if method_name not in self._pool_ref_store._active_methods:
311-
calculator = CapacityCalculator(self._pool_ref_store._batteries)
250+
calculator = CapacityCalculator(self._pool_ref_store.component_ids)
312251
self._pool_ref_store._active_methods[method_name] = SendOnUpdate(
313252
metric_calculator=calculator,
314253
working_batteries=self._pool_ref_store._working_batteries,
@@ -317,59 +256,16 @@ def capacity(self) -> ReceiverFetcher[Sample[Energy]]:
317256

318257
return self._pool_ref_store._active_methods[method_name]
319258

259+
@override
320260
@property
321-
def power_status(self) -> ReceiverFetcher[BatteryPoolReport]:
322-
"""Get a receiver to receive new power status reports when they change.
323-
324-
These include
325-
- the current inclusion/exclusion bounds available for the pool's priority,
326-
- the current target power for the pool's set of batteries,
327-
- the result of the last distribution request for the pool's set of batteries.
328-
329-
Returns:
330-
A receiver that will stream power status reports for the pool's priority.
331-
"""
332-
sub = _power_managing.ReportRequest(
333-
source_id=self._source_id,
334-
priority=self._priority,
335-
component_ids=self._pool_ref_store._batteries,
336-
)
337-
self._pool_ref_store._power_bounds_subs[sub.get_channel_name()] = (
338-
asyncio.create_task(
339-
self._pool_ref_store._power_manager_bounds_subscription_sender.send(sub)
340-
)
341-
)
342-
channel = self._pool_ref_store._channel_registry.get_or_create(
343-
_power_managing._Report, sub.get_channel_name()
344-
)
345-
channel.resend_latest = True
346-
347-
return channel
348-
349-
@property
350-
def power_distribution_results(self) -> ReceiverFetcher[_power_distributing.Result]:
351-
"""Get a receiver to receive power distribution results.
352-
353-
Returns:
354-
A receiver that will stream power distribution results for the pool's set of
355-
batteries.
356-
"""
357-
return MappingReceiverFetcher(
358-
self._pool_ref_store._power_dist_results_fetcher,
359-
lambda recv: recv.filter(
360-
lambda x: x.request.component_ids == self._pool_ref_store._batteries
361-
),
362-
)
363-
364-
@property
365-
def _system_power_bounds(self) -> ReceiverFetcher[SystemBounds]:
261+
def system_power_bounds(self) -> ReceiverFetcher[SystemBounds]:
366262
"""Get receiver to receive new power bounds when they change.
367263
368264
Power bounds refer to the min and max power that a battery can
369265
discharge or charge at and is also denoted as SoP.
370266
371267
Power bounds formulas are described in the receiver return type.
372-
None will be send if there is no component to calculate metrics.
268+
None will be sent if there is no component to calculate metrics.
373269
374270
A receiver from the MetricAggregator can be obtained by calling the
375271
`new_receiver` method.
@@ -378,23 +274,16 @@ def _system_power_bounds(self) -> ReceiverFetcher[SystemBounds]:
378274
A MetricAggregator that will calculate and stream the power bounds
379275
of all batteries in the pool.
380276
"""
277+
assert isinstance(self._pool_ref_store, BatteryPoolReferenceStore)
278+
381279
method_name = SendOnUpdate.name() + "_" + PowerBoundsCalculator.name()
382280

383281
if method_name not in self._pool_ref_store._active_methods:
384-
calculator = PowerBoundsCalculator(self._pool_ref_store._batteries)
282+
calculator = PowerBoundsCalculator(self._pool_ref_store.component_ids)
385283
self._pool_ref_store._active_methods[method_name] = SendOnUpdate(
386284
metric_calculator=calculator,
387285
working_batteries=self._pool_ref_store._working_batteries,
388286
min_update_interval=self._pool_ref_store._min_update_interval,
389287
)
390288

391289
return self._pool_ref_store._active_methods[method_name]
392-
393-
async def stop(self) -> None:
394-
"""Stop all tasks and channels owned by the BatteryPool."""
395-
# This was closing the pool_ref_store, which is not correct, because those are
396-
# shared.
397-
#
398-
# This method will do until we have a mechanism to track the resources created
399-
# through it. It can also eventually cleanup the pool_ref_store, when it is
400-
# holding the last reference to it.

0 commit comments

Comments
 (0)