forked from frequenz-floss/frequenz-channels-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path_grouping_latest_value_cache.py
More file actions
226 lines (176 loc) · 7.77 KB
/
Copy path_grouping_latest_value_cache.py
File metadata and controls
226 lines (176 loc) · 7.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
"""The GroupingLatestValueCache caches the latest values in a receiver grouped by key.
It provides a way to look up on demand, the latest value in a stream for any key, as
long as there has been at least one value received for that key.
[GroupingLatestValueCache][frequenz.channels.experimental.GroupingLatestValueCache]
takes a [Receiver][frequenz.channels.Receiver] and a `key` function as arguments and
stores the latest value received by that receiver for each key separately.
The `GroupingLatestValueCache` implements the [`Mapping`][collections.abc.Mapping]
interface, so it can be used like a dictionary. In addition, it provides a
[has_value][frequenz.channels.experimental.GroupingLatestValueCache.has_value] method to
check if a value has been received for a specific key, and a
[clear][frequenz.channels.experimental.GroupingLatestValueCache.clear] method to clear
the cached value for a specific key.
Example:
```python
from frequenz.channels import Broadcast
from frequenz.channels.experimental import GroupingLatestValueCache
channel = Broadcast[tuple[int, str]](name="lvc_test")
cache = GroupingLatestValueCache(channel.new_receiver(), key=lambda x: x[0])
sender = channel.new_sender()
assert not cache.has_value(6)
await sender.send((6, "twenty-six"))
assert cache.has_value(6)
assert cache.get(6) == (6, "twenty-six")
```
"""
import asyncio
import typing
from collections.abc import ItemsView, Iterator, KeysView, Mapping, ValuesView
from typing_extensions import override
from .._receiver import Receiver
T_co = typing.TypeVar("T_co", covariant=True)
T = typing.TypeVar("T")
HashableT = typing.TypeVar("HashableT", bound=typing.Hashable)
class GroupingLatestValueCache(Mapping[HashableT, T_co]):
"""A cache that stores the latest value in a receiver, grouped by key."""
def __init__(
self,
receiver: Receiver[T_co],
*,
key: typing.Callable[[T_co], HashableT],
unique_id: str | None = None,
) -> None:
"""Create a new cache.
Args:
receiver: The receiver to cache values from.
key: An function that takes a value and returns a key to group the values
by.
unique_id: A string to help uniquely identify this instance. If not
provided, a unique identifier will be generated from the object's
[`id()`][id]. It is used mostly for debugging purposes.
"""
self._receiver: Receiver[T_co] = receiver
self._key: typing.Callable[[T_co], HashableT] = key
self._unique_id: str = hex(id(self)) if unique_id is None else unique_id
self._latest_value_by_key: dict[HashableT, T_co] = {}
self._task: asyncio.Task[None] = asyncio.create_task(
self._run(), name=f"LatestValueCache«{self._unique_id}»"
)
@property
def unique_id(self) -> str:
"""The unique identifier of this instance."""
return self._unique_id
@override
def keys(self) -> KeysView[HashableT]:
"""Return the set of keys for which values have been received.
If no key function is provided, this will return an empty set.
"""
return self._latest_value_by_key.keys()
@override
def items(self) -> ItemsView[HashableT, T_co]:
"""Return an iterator over the key-value pairs of the latest values received."""
return self._latest_value_by_key.items()
@override
def values(self) -> ValuesView[T_co]:
"""Return an iterator over the latest values received."""
return self._latest_value_by_key.values()
@typing.overload
def get(self, key: HashableT, default: None = None) -> T_co | None:
"""Return the latest value that has been received for a specific key."""
# MyPy passes this overload as a valid signature, but pylint does not like it.
@typing.overload
def get( # pylint: disable=signature-differs
self, key: HashableT, default: T
) -> T_co | T:
"""Return the latest value that has been received for a specific key."""
@override
def get(self, key: HashableT, default: T | None = None) -> T_co | T | None:
"""Return the latest value that has been received.
Args:
key: An optional key to retrieve the latest value for that key. If not
provided, it retrieves the latest value received overall.
default: The default value to return if no value has been received yet for
the specified key. If not provided, it defaults to `None`.
Returns:
The latest value that has been received.
"""
return self._latest_value_by_key.get(key, default)
@override
def __iter__(self) -> Iterator[HashableT]:
"""Return an iterator over the keys for which values have been received."""
return iter(self._latest_value_by_key)
@override
def __len__(self) -> int:
"""Return the number of keys for which values have been received."""
return len(self._latest_value_by_key)
@override
def __getitem__(self, key: HashableT) -> T_co:
"""Return the latest value that has been received for a specific key.
Args:
key: The key to retrieve the latest value for.
Returns:
The latest value that has been received for that key.
Raises:
KeyError: If no value has been received yet for that key.
"""
if key not in self._latest_value_by_key:
raise KeyError(f"No value received for key: {key!r}")
return self._latest_value_by_key[key]
@override
def __contains__(self, key: object, /) -> bool:
"""Check if a value has been received for a specific key.
Args:
key: The key to check for.
Returns:
`True` if a value has been received for that key, `False` otherwise.
"""
return key in self._latest_value_by_key
@override
def __eq__(self, other: object, /) -> bool:
"""Check if this cache is equal to another object.
Two caches are considered equal if they have the same keys and values.
Args:
other: The object to compare with.
Returns:
`True` if the caches are equal, `False` otherwise.
"""
if not isinstance(other, GroupingLatestValueCache):
return NotImplemented
return self._latest_value_by_key == other._latest_value_by_key
@override
def __ne__(self, value: object, /) -> bool:
"""Check if this cache is not equal to another object.
Args:
value: The object to compare with.
Returns:
`True` if the caches are not equal, `False` otherwise.
"""
if not isinstance(value, GroupingLatestValueCache):
return NotImplemented
return self._latest_value_by_key != value._latest_value_by_key
def clear(self, key: HashableT) -> None:
"""Clear the latest value for a specific key.
Args:
key: The key for which to clear the latest value.
"""
_ = self._latest_value_by_key.pop(key, None)
async def stop(self) -> None:
"""Stop the cache."""
if not self._task.done():
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
def __repr__(self) -> str:
"""Return a string representation of this cache."""
return (
f"<GroupingLatestValueCache num_keys={len(self._latest_value_by_key.keys())}, "
f"receiver={self._receiver!r}, unique_id={self._unique_id!r}>"
)
async def _run(self) -> None:
async for value in self._receiver:
key = self._key(value)
self._latest_value_by_key[key] = value