-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy path_grouping_latest_value_cache.py
More file actions
301 lines (235 loc) · 10.4 KB
/
Copy path_grouping_latest_value_cache.py
File metadata and controls
301 lines (235 loc) · 10.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
"""The GroupingLatestValueCache caches the latest values in a receiver grouped by key."""
import asyncio
from collections.abc import (
Callable,
Hashable,
ItemsView,
Iterator,
KeysView,
Mapping,
ValuesView,
)
from typing import TypeVar, overload
from typing_extensions import override
from .._receiver import Receiver
ValueT_co = TypeVar("ValueT_co", covariant=True)
"""Covariant type variable for the values cached by the `GroupingLatestValueCache`."""
DefaultT = TypeVar("DefaultT")
"""Type variable for the default value returned by `GroupingLatestValueCache.get`."""
HashableT = TypeVar("HashableT", bound=Hashable)
"""Type variable for the keys used to group values in the `GroupingLatestValueCache`."""
class _NotSpecified:
"""A sentinel value to indicate that no default value was provided."""
def __repr__(self) -> str:
"""Return a string representation of this sentinel."""
return "<_NotSpecified>"
class GroupingLatestValueCache(Mapping[HashableT, ValueT_co]):
"""A cache that stores the latest value in a receiver, grouped by key.
It provides a way to look up on demand, the latest value in a stream for any key, as
long as there has been at least one value received for that key.
[GroupingLatestValueCache][frequenz.channels.experimental.GroupingLatestValueCache]
takes a [Receiver][frequenz.channels.Receiver] and a `key` function as arguments and
stores the latest value received by that receiver for each key separately.
The `GroupingLatestValueCache` implements the [`Mapping`][collections.abc.Mapping]
interface, so it can be used like a dictionary. Additionally other methods from
[`MutableMapping`][collections.abc.MutableMapping] are implemented, but only
methods removing items from the cache are allowed, such as
[`pop()`][frequenz.channels.experimental.GroupingLatestValueCache.pop],
[`popitem()`][frequenz.channels.experimental.GroupingLatestValueCache.popitem],
[`clear()`][frequenz.channels.experimental.GroupingLatestValueCache.clear], and
[`__delitem__()`][frequenz.channels.experimental.GroupingLatestValueCache.__delitem__].
Other update methods are not provided because the user should not update the
cache values directly.
Example:
```python
from frequenz.channels import Broadcast
from frequenz.channels.experimental import GroupingLatestValueCache
channel = Broadcast[tuple[int, str]](name="lvc_test")
cache = GroupingLatestValueCache(channel.new_receiver(), key=lambda x: x[0])
sender = channel.new_sender()
assert cache.get(6) is None
assert 6 not in cache
await sender.send((6, "twenty-six"))
assert 6 in cache
assert cache.get(6) == (6, "twenty-six")
del cache[6]
assert cache.get(6) is None
assert 6 not in cache
await cache.stop()
```
"""
def __init__(
self,
receiver: Receiver[ValueT_co],
*,
key: Callable[[ValueT_co], HashableT],
unique_id: str | None = None,
) -> None:
"""Create a new cache.
Args:
receiver: The receiver to cache values from.
key: An function that takes a value and returns a key to group the values
by.
unique_id: A string to help uniquely identify this instance. If not
provided, a unique identifier will be generated from the object's
[`id()`][id]. It is used mostly for debugging purposes.
"""
self._receiver: Receiver[ValueT_co] = receiver
self._key: Callable[[ValueT_co], HashableT] = key
self._unique_id: str = hex(id(self)) if unique_id is None else unique_id
self._latest_value_by_key: dict[HashableT, ValueT_co] = {}
self._task: asyncio.Task[None] = asyncio.create_task(
self._run(), name=f"LatestValueCache«{self._unique_id}»"
)
@property
def unique_id(self) -> str:
"""The unique identifier of this instance."""
return self._unique_id
@override
def keys(self) -> KeysView[HashableT]:
"""Return the set of keys for which values have been received.
If no key function is provided, this will return an empty set.
"""
return self._latest_value_by_key.keys()
@override
def items(self) -> ItemsView[HashableT, ValueT_co]:
"""Return an iterator over the key-value pairs of the latest values received."""
return self._latest_value_by_key.items()
@override
def values(self) -> ValuesView[ValueT_co]:
"""Return an iterator over the latest values received."""
return self._latest_value_by_key.values()
@overload
def get(self, key: HashableT, default: None = None) -> ValueT_co | None:
"""Return the latest value that has been received for a specific key."""
# MyPy passes this overload as a valid signature, but pylint does not like it.
@overload
def get( # pylint: disable=signature-differs
self, key: HashableT, default: DefaultT
) -> ValueT_co | DefaultT:
"""Return the latest value that has been received for a specific key."""
@override
def get(
self, key: HashableT, default: DefaultT | None = None
) -> ValueT_co | DefaultT | None:
"""Return the latest value that has been received.
Args:
key: An optional key to retrieve the latest value for that key. If not
provided, it retrieves the latest value received overall.
default: The default value to return if no value has been received yet for
the specified key. If not provided, it defaults to `None`.
Returns:
The latest value that has been received.
"""
return self._latest_value_by_key.get(key, default)
@override
def __iter__(self) -> Iterator[HashableT]:
"""Return an iterator over the keys for which values have been received."""
return iter(self._latest_value_by_key)
@override
def __len__(self) -> int:
"""Return the number of keys for which values have been received."""
return len(self._latest_value_by_key)
@override
def __getitem__(self, key: HashableT) -> ValueT_co:
"""Return the latest value that has been received for a specific key.
Args:
key: The key to retrieve the latest value for.
Returns:
The latest value that has been received for that key.
"""
return self._latest_value_by_key[key]
@override
def __contains__(self, key: object, /) -> bool:
"""Check if a value has been received for a specific key.
Args:
key: The key to check for.
Returns:
`True` if a value has been received for that key, `False` otherwise.
"""
return key in self._latest_value_by_key
@override
def __eq__(self, other: object, /) -> bool:
"""Check if this cache is equal to another object.
Two caches are considered equal if they have the same keys and values.
Args:
other: The object to compare with.
Returns:
`True` if the caches are equal, `False` otherwise.
"""
match other:
case GroupingLatestValueCache():
return self._latest_value_by_key == other._latest_value_by_key
case Mapping():
return self._latest_value_by_key == other
case _:
return NotImplemented
@override
def __ne__(self, value: object, /) -> bool:
"""Check if this cache is not equal to another object.
Args:
value: The object to compare with.
Returns:
`True` if the caches are not equal, `False` otherwise.
"""
return not self.__eq__(value)
def __delitem__(self, key: HashableT) -> None:
"""Clear the latest value for a specific key.
Args:
key: The key for which to clear the latest value.
"""
del self._latest_value_by_key[key]
@overload
def pop(self, key: HashableT, /) -> ValueT_co | None:
"""Remove the latest value for a specific key and return it."""
@overload
def pop(self, key: HashableT, /, default: DefaultT) -> ValueT_co | DefaultT:
"""Remove the latest value for a specific key and return it."""
def pop(
self, key: HashableT, /, default: DefaultT | _NotSpecified = _NotSpecified()
) -> ValueT_co | DefaultT | None:
"""Remove the latest value for a specific key and return it.
If no value has been received yet for that key, it returns the default value or
raises a `KeyError` if no default value is provided.
Args:
key: The key for which to remove the latest value.
default: The default value to return if no value has been received yet for
the specified key.
Returns:
The latest value that has been received for that key, or the default value if
no value has been received yet and a default value is provided.
"""
if isinstance(default, _NotSpecified):
return self._latest_value_by_key.pop(key)
return self._latest_value_by_key.pop(key, default)
def popitem(self) -> tuple[HashableT, ValueT_co]:
"""Remove and return a (key, value) pair from the cache.
Pairs are returned in LIFO (last-in, first-out) order.
Returns:
A tuple containing the key and the latest value that has been received for
that key.
"""
return self._latest_value_by_key.popitem()
def clear(self) -> None:
"""Clear all entries from the cache."""
self._latest_value_by_key.clear()
async def stop(self) -> None:
"""Stop the cache."""
if not self._task.done():
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
def __repr__(self) -> str:
"""Return a string representation of this cache."""
return (
f"<GroupingLatestValueCache num_keys={len(self._latest_value_by_key.keys())}, "
f"receiver={self._receiver!r}, unique_id={self._unique_id!r}>"
)
async def _run(self) -> None:
async for value in self._receiver:
key = self._key(value)
self._latest_value_by_key[key] = value