forked from frequenz-floss/frequenz-channels-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path_latest_value_cache.py
More file actions
238 lines (187 loc) · 8.5 KB
/
Copy path_latest_value_cache.py
File metadata and controls
238 lines (187 loc) · 8.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
# License: MIT
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
"""The LatestValueCache caches the latest value in a receiver.
It provides a way to look up the latest value in a stream whenever required, as
long as there has been one value received.
[LatestValueCache][frequenz.channels.LatestValueCache] takes a
[Receiver][frequenz.channels.Receiver] as an argument and stores the latest
value received by that receiver. It also takes an optional `key` function
that allows you to group the values by a specific key. If the `key` is
provided, the cache will store the latest value for each key separately,
otherwise it will store only the latest value received overall.
As soon as a value is received, its
[`has_value`][frequenz.channels.LatestValueCache.has_value] method returns
`True`, and its [`get`][frequenz.channels.LatestValueCache.get] method returns
the latest value received. The `get` method will raise an exception if called
before any messages have been received from the receiver.
Both `has_value` and `get` methods can take an optional `key` argument to
check or retrieve the latest value for that specific key.
Example:
```python
from frequenz.channels import Broadcast, LatestValueCache
channel = Broadcast[int](name="lvc_test")
cache = LatestValueCache(channel.new_receiver())
sender = channel.new_sender()
assert not cache.has_value()
await sender.send(5)
assert cache.has_value()
assert cache.get() == 5
```
"""
from __future__ import annotations
import asyncio
import typing
from collections.abc import Set
from ._receiver import Receiver
T_co = typing.TypeVar("T_co", covariant=True)
HashableT = typing.TypeVar("HashableT", bound=typing.Hashable)
class Sentinel:
"""A sentinel to denote that no value has been received yet."""
def __init__(self, desc: str) -> None:
"""Initialize the sentinel."""
self._desc = desc
def __str__(self) -> str:
"""Return a string representation of this sentinel."""
return f"<Sentinel: {self._desc}>"
NO_KEY: typing.Final[Sentinel] = Sentinel("no key provided")
NO_KEY_FUNCTION: typing.Final[Sentinel] = Sentinel("no key function provided")
NO_VALUE_RECEIVED: typing.Final[Sentinel] = Sentinel("no value received yet")
class LatestValueCache(typing.Generic[T_co, HashableT]):
"""A cache that stores the latest value in a receiver.
It provides a way to look up the latest value in a stream without any delay,
as long as there has been one value received.
"""
@typing.overload
def __init__(
self: LatestValueCache[T_co, Sentinel],
receiver: Receiver[T_co],
*,
unique_id: str | None = None,
key: Sentinel = NO_KEY_FUNCTION,
) -> None:
"""Create a new cache that does not use keys.
Args:
receiver: The receiver to cache.
unique_id: A string to help uniquely identify this instance. If not
provided, a unique identifier will be generated from the object's
[`id()`][id]. It is used mostly for debugging purposes.
key: This parameter is ignored when set to `None`.
"""
@typing.overload
def __init__(
self: LatestValueCache[T_co, HashableT],
receiver: Receiver[T_co],
*,
unique_id: str | None = None,
key: typing.Callable[[T_co], HashableT],
) -> None:
"""Create a new cache that uses keys.
Args:
receiver: The receiver to cache.
unique_id: A string to help uniquely identify this instance. If not
provided, a unique identifier will be generated from the object's
[`id()`][id]. It is used mostly for debugging purposes.
key: A function that takes a value and returns a key to group the values by.
If provided, the cache will store the latest value for each key separately.
"""
def __init__(
self,
receiver: Receiver[T_co],
*,
unique_id: str | None = None,
key: typing.Callable[[T_co], typing.Any] | Sentinel = NO_KEY_FUNCTION,
) -> None:
"""Create a new cache.
Args:
receiver: The receiver to cache.
unique_id: A string to help uniquely identify this instance. If not
provided, a unique identifier will be generated from the object's
[`id()`][id]. It is used mostly for debugging purposes.
key: An optional function that takes a value and returns a key to group the
values by. If provided, the cache will store the latest value for each
key separately. If not provided, it will store only the latest value
received overall.
"""
self._receiver = receiver
self._key: typing.Callable[[T_co], HashableT] | Sentinel = key
self._unique_id: str = hex(id(self)) if unique_id is None else unique_id
self._latest_value: T_co | Sentinel = NO_VALUE_RECEIVED
self._latest_value_by_key: dict[HashableT, T_co] = {}
self._task = asyncio.create_task(
self._run(), name=f"LatestValueCache«{self._unique_id}»"
)
@property
def unique_id(self) -> str:
"""The unique identifier of this instance."""
return self._unique_id
def keys(self) -> Set[HashableT]:
"""Return the set of keys for which values have been received.
If no key function is provided, this will return an empty set.
"""
return self._latest_value_by_key.keys()
def get(self, key: HashableT | Sentinel = NO_KEY) -> T_co:
"""Return the latest value that has been received.
This raises a `ValueError` if no value has been received yet. Use `has_value` to
check whether a value has been received yet, before trying to access the value,
to avoid the exception.
Args:
key: An optional key to retrieve the latest value for that key. If not
provided, it retrieves the latest value received overall.
Returns:
The latest value that has been received.
Raises:
ValueError: If no value has been received yet.
"""
if not isinstance(key, Sentinel):
if key not in self._latest_value_by_key:
raise ValueError(f"No value received for key: {key!r}")
return self._latest_value_by_key[key]
if isinstance(self._latest_value, Sentinel):
raise ValueError("No value has been received yet.")
return self._latest_value
def has_value(self, key: HashableT | Sentinel = NO_KEY) -> bool:
"""Check whether a value has been received yet.
If `key` is provided, it checks whether a value has been received for that key.
Args:
key: An optional key to check if a value has been received for that key.
Returns:
`True` if a value has been received, `False` otherwise.
"""
if not isinstance(key, Sentinel):
return key in self._latest_value_by_key
return not isinstance(self._latest_value, Sentinel)
def clear(self, key: HashableT | Sentinel = NO_KEY) -> None:
"""Clear the latest value or the latest value for a specific key.
If `key` is provided, it clears the latest value for that key. If no key is
provided, it clears the latest value received overall.
Args:
key: An optional key to clear the latest value for that key. If not
provided, it clears the latest value received overall.
"""
if not isinstance(key, Sentinel):
_ = self._latest_value_by_key.pop(key, None)
return
self._latest_value = NO_VALUE_RECEIVED
async def _run(self) -> None:
async for value in self._receiver:
self._latest_value = value
if not isinstance(self._key, Sentinel):
key = self._key(value)
self._latest_value_by_key[key] = value
async def stop(self) -> None:
"""Stop the cache."""
if not self._task.done():
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
def __repr__(self) -> str:
"""Return a string representation of this cache."""
return (
f"<LatestValueCache latest_value={self._latest_value!r}, "
f"receiver={self._receiver!r}, unique_id={self._unique_id!r}>"
)
def __str__(self) -> str:
"""Return the last value seen by this cache."""
return str(self._latest_value)