Skip to content

Commit 39143cb

Browse files
committed
Extract abstract base classes from pools and pool ref stores
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
1 parent a84806b commit 39143cb

10 files changed

Lines changed: 440 additions & 603 deletions

File tree

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# License: MIT
2+
# Copyright © 2026 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Manage a pool of components."""
5+
6+
from ._abstract_pool import AbstractPool
7+
from ._abstract_pool_reference_store import AbstractPoolReferenceStore
8+
9+
__all__ = [
10+
"AbstractPool",
11+
"AbstractPoolReferenceStore",
12+
]
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
# License: MIT
2+
# Copyright © 2026 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Abstract base class for component pools."""
5+
6+
import asyncio
7+
import uuid
8+
from abc import ABC, abstractmethod
9+
from collections import abc
10+
11+
from frequenz.client.common.microgrid.components import ComponentId
12+
from frequenz.quantities import Power
13+
14+
from frequenz.sdk._internal._channels import MappingReceiverFetcher, ReceiverFetcher
15+
from frequenz.sdk.microgrid import _power_distributing, _power_managing
16+
from frequenz.sdk.timeseries import Bounds
17+
from frequenz.sdk.timeseries._base_types import SystemBounds
18+
from frequenz.sdk.timeseries.abstract_pool._abstract_pool_reference_store import (
19+
AbstractPoolReferenceStore,
20+
)
21+
from frequenz.sdk.timeseries.formulas import Formula
22+
23+
24+
class AbstractPool(ABC):
25+
"""Abstract base class for component pools."""
26+
27+
def __init__( # pylint: disable=too-many-arguments
28+
self,
29+
*,
30+
pool_ref_store: AbstractPoolReferenceStore,
31+
name: str | None,
32+
priority: int,
33+
) -> None:
34+
"""Create an `AbstractPool` instance.
35+
36+
Args:
37+
pool_ref_store: The pool reference store instance.
38+
name: An optional name used to identify this instance of the pool or a
39+
corresponding actor in the logs.
40+
priority: The priority of the actor using this wrapper.
41+
"""
42+
self._pool_ref_store = pool_ref_store
43+
unique_id = str(uuid.uuid4())
44+
self._source_id = unique_id if name is None else f"{name}-{unique_id}"
45+
self._priority = priority
46+
47+
@property
48+
def component_ids(self) -> abc.Set[ComponentId]:
49+
"""Return component IDs of all component IDs managed by this pool.
50+
51+
Returns:
52+
Set of managed component IDs.
53+
"""
54+
return self._pool_ref_store.component_ids
55+
56+
async def propose_power(
57+
self,
58+
power: Power | None,
59+
bounds: Bounds[Power | None] = Bounds(None, None),
60+
) -> None:
61+
"""Send a proposal to the power manager for the pool's underlying components.
62+
63+
This proposal is for the maximum power that can be set for the components in
64+
the pool. The actual production or consumption might be lower.
65+
66+
Details on how the power manager handles proposals can be found in the
67+
[Microgrid][frequenz.sdk.microgrid--setting-power] documentation.
68+
69+
Args:
70+
power: The power to propose. If `None`,
71+
this proposal will not have any effect on the target power, unless
72+
bounds are specified. When specified without bounds, bounds for lower
73+
priority actors will be shifted by this power. If both are `None`, it
74+
is equivalent to not having a proposal or withdrawing a previous one.
75+
bounds: The power bounds for the proposal. When specified, these bounds will
76+
limit the bounds for lower priority actors.
77+
"""
78+
await self._pool_ref_store.power_manager_requests_sender.send(
79+
_power_managing.Proposal(
80+
source_id=self._source_id,
81+
preferred_power=power,
82+
bounds=bounds,
83+
component_ids=self._pool_ref_store.component_ids,
84+
priority=self._priority,
85+
creation_time=asyncio.get_running_loop().time(),
86+
)
87+
)
88+
89+
@property
90+
@abstractmethod
91+
def power(self) -> Formula[Power]:
92+
"""Fetch the total power for the components in the pool.
93+
94+
Returns:
95+
A Formula that will calculate and stream the total power of all
96+
components in the pool.
97+
"""
98+
99+
@property
100+
def power_status(self) -> ReceiverFetcher[_power_managing._Report]:
101+
"""Get a receiver to receive new power status reports when they change.
102+
103+
These include
104+
- the current inclusion/exclusion bounds available for the pool's priority,
105+
- the current target power for the pool's set of components,
106+
- the result of the last distribution request for the pool's set of components,.
107+
108+
Returns:
109+
A receiver that will stream power status reports for the pool's priority.
110+
"""
111+
sub = _power_managing.ReportRequest(
112+
source_id=self._source_id,
113+
priority=self._priority,
114+
component_ids=self._pool_ref_store.component_ids,
115+
)
116+
self._pool_ref_store.power_bounds_subs[sub.get_channel_name()] = (
117+
asyncio.create_task(
118+
self._pool_ref_store.power_manager_bounds_subs_sender.send(sub)
119+
)
120+
)
121+
channel = self._pool_ref_store.channel_registry.get_or_create(
122+
_power_managing._Report, # pylint: disable=protected-access
123+
sub.get_channel_name(),
124+
)
125+
channel.resend_latest = True
126+
127+
return channel
128+
129+
@property
130+
def power_distribution_results(self) -> ReceiverFetcher[_power_distributing.Result]:
131+
"""Get a receiver to receive power distribution results.
132+
133+
Returns:
134+
A receiver that will stream power distribution results for the pool's set of
135+
components.
136+
"""
137+
return MappingReceiverFetcher(
138+
self._pool_ref_store.power_distribution_results_fetcher,
139+
lambda recv: recv.filter(
140+
lambda x: x.request.component_ids == self._pool_ref_store.component_ids
141+
),
142+
)
143+
144+
@property
145+
def _system_power_bounds(self) -> ReceiverFetcher[SystemBounds]:
146+
"""Return a receiver fetcher for the system power bounds."""
147+
return self._pool_ref_store.bounds_channel
148+
149+
async def stop(self) -> None:
150+
"""Stop all tasks and channels owned by the pool."""
151+
# This was closing the pool_ref_store, which is not correct, because those are
152+
# shared.
153+
#
154+
# This method will do until we have a mechanism to track the resources created
155+
# through it. It can also eventually cleanup the pool_ref_store, when it is
156+
# holding the last reference to it.
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
# License: MIT
2+
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Abstract base class for pool reference stores."""
5+
6+
import asyncio
7+
from abc import ABC, abstractmethod
8+
from collections import abc
9+
from typing import Type
10+
11+
from frequenz.channels import Broadcast, Receiver, Sender
12+
from frequenz.client.common.microgrid.components import ComponentId
13+
from frequenz.client.microgrid.component import Component
14+
15+
from frequenz.sdk._internal._channels import ChannelRegistry, ReceiverFetcher
16+
from frequenz.sdk.actor import BackgroundService
17+
from frequenz.sdk.microgrid import connection_manager
18+
from frequenz.sdk.microgrid._data_sourcing import ComponentMetricRequest
19+
from frequenz.sdk.microgrid._power_distributing import ComponentPoolStatus, Result
20+
from frequenz.sdk.microgrid._power_managing import Proposal, ReportRequest
21+
from frequenz.sdk.timeseries._base_types import SystemBounds
22+
from frequenz.sdk.timeseries.formulas._formula_pool import FormulaPool
23+
24+
25+
class AbstractPoolReferenceStore(ABC):
26+
"""Abstract base class for pool reference stores."""
27+
28+
def __init__( # pylint: disable=too-many-arguments
29+
self,
30+
*,
31+
channel_registry: ChannelRegistry,
32+
resampler_subscription_sender: Sender[ComponentMetricRequest],
33+
status_receiver: Receiver[ComponentPoolStatus],
34+
power_manager_requests_sender: Sender[Proposal],
35+
power_manager_bounds_subs_sender: Sender[ReportRequest],
36+
power_distribution_results_fetcher: ReceiverFetcher[Result],
37+
component_ids: abc.Set[ComponentId] | None = None,
38+
):
39+
"""Initialize this instance.
40+
41+
Args:
42+
channel_registry: A channel registry instance shared with the resampling
43+
actor.
44+
resampler_subscription_sender: A sender for sending metric requests to the
45+
resampling actor.
46+
status_receiver: A receiver that streams the status of the components in
47+
the pool.
48+
power_manager_requests_sender: A Channel sender for sending power
49+
requests to the power managing actor.
50+
power_manager_bounds_subs_sender: A Channel sender for sending power bounds
51+
subscription requests to the power managing actor.
52+
power_distribution_results_fetcher: A ReceiverFetcher for the results from
53+
the power distributing actor.
54+
component_ids: An optional list of component_ids belonging to this pool. If
55+
not specified, IDs of all components of the components type of this pool
56+
in the microgrid will be fetched from the component graph.
57+
58+
Raises:
59+
ValueError: If any of the provided component_ids are not of correct type or
60+
are unknown to the component graph.
61+
"""
62+
self.channel_registry = channel_registry
63+
self.resampler_subscription_sender = resampler_subscription_sender
64+
self.status_receiver = status_receiver
65+
self.power_manager_requests_sender = power_manager_requests_sender
66+
self.power_manager_bounds_subs_sender = power_manager_bounds_subs_sender
67+
self.power_distribution_results_fetcher = power_distribution_results_fetcher
68+
69+
graph = connection_manager.get().component_graph
70+
all_components = frozenset(
71+
{
72+
inv.id
73+
for inv in graph.components(matching_types=self.get_component_class())
74+
}
75+
)
76+
77+
if component_ids is not None:
78+
self.component_ids: frozenset[ComponentId] = frozenset(component_ids)
79+
if not self.component_ids.issubset(all_components):
80+
unknown_ids = self.component_ids - all_components
81+
raise ValueError(
82+
f"Unable to create {self.get_pool_type_name()}. These component IDs "
83+
+ f"are either not {self.get_component_type_name_plural()} or are unknown: "
84+
+ f"{unknown_ids}"
85+
)
86+
else:
87+
self.component_ids = all_components
88+
89+
self.power_bounds_subs: dict[str, asyncio.Task[None]] = {}
90+
91+
self.namespace = self.get_namespace()
92+
self.formula_pool = FormulaPool(
93+
self.namespace,
94+
self.channel_registry,
95+
self.resampler_subscription_sender,
96+
)
97+
self.bounds_channel: Broadcast[SystemBounds] = Broadcast(
98+
name=f"System Bounds for {self.get_component_type_name_plural()}: {self.component_ids}",
99+
resend_latest=True,
100+
)
101+
102+
self.bounds_tracker: BackgroundService | None = None
103+
self.create_bounds_tracker()
104+
105+
@staticmethod
106+
@abstractmethod
107+
def get_component_class() -> Type[Component]:
108+
"""Class of the component type."""
109+
110+
@staticmethod
111+
@abstractmethod
112+
def get_pool_type_name() -> str:
113+
"""Name of the pool type, for display purposes."""
114+
115+
@staticmethod
116+
@abstractmethod
117+
def get_component_type_name_plural() -> str:
118+
"""Name of the component type, for display purposes."""
119+
120+
@abstractmethod
121+
def get_namespace(self) -> str:
122+
"""Namespace to use with the data pipeline."""
123+
124+
@abstractmethod
125+
def create_bounds_tracker(self) -> None:
126+
"""Create the bounds tracker for the pool."""

0 commit comments

Comments
 (0)