Skip to content

Commit a7a2f56

Browse files
committed
Support grouping by keys in LatestValueCache
`LatestValueCache` now takes an optional `key` function. When specified, it is used to get the key for each incoming message, and the latest value for each key is cached and can be retrieved separately. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
1 parent f7fb341 commit a7a2f56

1 file changed

Lines changed: 80 additions & 5 deletions

File tree

src/frequenz/channels/_latest_value_cache.py

Lines changed: 80 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,20 @@
88
99
[LatestValueCache][frequenz.channels.LatestValueCache] takes a
1010
[Receiver][frequenz.channels.Receiver] as an argument and stores the latest
11-
value received by that receiver. As soon as a value is received, its
11+
value received by that receiver. It also takes an optional `key` function
12+
that allows you to group the values by a specific key. If the `key` is
13+
provided, the cache will store the latest value for each key separately,
14+
otherwise it will store only the latest value received overall.
15+
16+
As soon as a value is received, its
1217
[`has_value`][frequenz.channels.LatestValueCache.has_value] method returns
1318
`True`, and its [`get`][frequenz.channels.LatestValueCache.get] method returns
1419
the latest value received. The `get` method will raise an exception if called
1520
before any messages have been received from the receiver.
1621
22+
Both `has_value` and `get` methods can take an optional `key` argument to
23+
check or retrieve the latest value for that specific key.
24+
1725
Example:
1826
```python
1927
from frequenz.channels import Broadcast, LatestValueCache
@@ -38,6 +46,7 @@
3846
from ._receiver import Receiver
3947

4048
T_co = typing.TypeVar("T_co", covariant=True)
49+
HashableT = typing.TypeVar("HashableT", bound=typing.Hashable)
4150

4251

4352
class _Sentinel:
@@ -48,15 +57,56 @@ def __str__(self) -> str:
4857
return "<no value received yet>"
4958

5059

51-
class LatestValueCache(typing.Generic[T_co]):
60+
class LatestValueCache(typing.Generic[T_co, HashableT]):
5261
"""A cache that stores the latest value in a receiver.
5362
5463
It provides a way to look up the latest value in a stream without any delay,
5564
as long as there has been one value received.
5665
"""
5766

67+
@typing.overload
68+
def __init__(
69+
self: "LatestValueCache[T_co, None]",
70+
receiver: Receiver[T_co],
71+
*,
72+
unique_id: str | None = None,
73+
key: None = None,
74+
) -> None:
75+
"""Create a new cache that does not use keys.
76+
77+
Args:
78+
receiver: The receiver to cache.
79+
unique_id: A string to help uniquely identify this instance. If not
80+
provided, a unique identifier will be generated from the object's
81+
[`id()`][id]. It is used mostly for debugging purposes.
82+
key: This parameter is ignored when set to `None`.
83+
"""
84+
85+
@typing.overload
86+
def __init__(
87+
self: "LatestValueCache[T_co, HashableT]",
88+
receiver: Receiver[T_co],
89+
*,
90+
unique_id: str | None = None,
91+
key: typing.Callable[[T_co], HashableT],
92+
) -> None:
93+
"""Create a new cache that uses keys.
94+
95+
Args:
96+
receiver: The receiver to cache.
97+
unique_id: A string to help uniquely identify this instance. If not
98+
provided, a unique identifier will be generated from the object's
99+
[`id()`][id]. It is used mostly for debugging purposes.
100+
key: A function that takes a value and returns a key to group the values by.
101+
If provided, the cache will store the latest value for each key separately.
102+
"""
103+
58104
def __init__(
59-
self, receiver: Receiver[T_co], *, unique_id: str | None = None
105+
self,
106+
receiver: Receiver[T_co],
107+
*,
108+
unique_id: str | None = None,
109+
key: typing.Callable[[T_co], typing.Any] | None = None,
60110
) -> None:
61111
"""Create a new cache.
62112
@@ -65,10 +115,16 @@ def __init__(
65115
unique_id: A string to help uniquely identify this instance. If not
66116
provided, a unique identifier will be generated from the object's
67117
[`id()`][id]. It is used mostly for debugging purposes.
118+
key: An optional function that takes a value and returns a key to group the
119+
values by. If provided, the cache will store the latest value for each
120+
key separately. If not provided, it will store only the latest value
121+
received overall.
68122
"""
69123
self._receiver = receiver
124+
self._key: typing.Callable[[T_co], HashableT] | None = key
70125
self._unique_id: str = hex(id(self)) if unique_id is None else unique_id
71126
self._latest_value: T_co | _Sentinel = _Sentinel()
127+
self._latest_value_by_key: dict[HashableT, T_co] = {}
72128
self._task = asyncio.create_task(
73129
self._run(), name=f"LatestValueCache«{self._unique_id}»"
74130
)
@@ -78,34 +134,53 @@ def unique_id(self) -> str:
78134
"""The unique identifier of this instance."""
79135
return self._unique_id
80136

81-
def get(self) -> T_co:
137+
def get(self, key: HashableT | None = None) -> T_co:
82138
"""Return the latest value that has been received.
83139
84140
This raises a `ValueError` if no value has been received yet. Use `has_value` to
85141
check whether a value has been received yet, before trying to access the value,
86142
to avoid the exception.
87143
144+
Args:
145+
key: An optional key to retrieve the latest value for that key. If not
146+
provided, it retrieves the latest value received overall.
147+
88148
Returns:
89149
The latest value that has been received.
90150
91151
Raises:
92152
ValueError: If no value has been received yet.
93153
"""
154+
if key is not None:
155+
if key not in self._latest_value_by_key:
156+
raise ValueError(f"No value received for key: {key!r}")
157+
return self._latest_value_by_key[key]
158+
94159
if isinstance(self._latest_value, _Sentinel):
95160
raise ValueError("No value has been received yet.")
96161
return self._latest_value
97162

98-
def has_value(self) -> bool:
163+
def has_value(self, key: HashableT | None = None) -> bool:
99164
"""Check whether a value has been received yet.
100165
166+
If `key` is provided, it checks whether a value has been received for that key.
167+
168+
Args:
169+
key: An optional key to check if a value has been received for that key.
170+
101171
Returns:
102172
`True` if a value has been received, `False` otherwise.
103173
"""
174+
if key is not None:
175+
return key in self._latest_value_by_key
104176
return not isinstance(self._latest_value, _Sentinel)
105177

106178
async def _run(self) -> None:
107179
async for value in self._receiver:
108180
self._latest_value = value
181+
if self._key is not None:
182+
key = self._key(value)
183+
self._latest_value_by_key[key] = value
109184

110185
async def stop(self) -> None:
111186
"""Stop the cache."""

0 commit comments

Comments
 (0)