forked from frequenz-floss/frequenz-channels-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path_grouping_latest_value_cache.py
More file actions
155 lines (119 loc) · 5.33 KB
/
Copy path_grouping_latest_value_cache.py
File metadata and controls
155 lines (119 loc) · 5.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
"""The GroupingLatestValueCache caches the latest values in a receiver grouped by key.
It provides a way to look up on demand, the latest value in a stream for any key, as
long as there has been at least one value received for that key.
[GroupingLatestValueCache][frequenz.channels.experimental.GroupingLatestValueCache]
takes a [Receiver][frequenz.channels.Receiver] and a `key` function as arguments and
stores the latest value received by that receiver for each key separately.
As soon as a value is received for a `key`, the
[`has_value`][frequenz.channels.experimental.GroupingLatestValueCache.has_value] method
returns `True` for that `key`, and the [`get`][frequenz.channels.LatestValueCache.get]
method for that `key` returns the latest value received. The `get` method will raise an
exception if called before any messages have been received from the receiver for a given
`key`.
Example:
```python
from frequenz.channels import Broadcast
from frequenz.channels.experimental import GroupingLatestValueCache
channel = Broadcast[tuple[int, str]](name="lvc_test")
cache = GroupingLatestValueCache(channel.new_receiver(), key=lambda x: x[0])
sender = channel.new_sender()
assert not cache.has_value(6)
await sender.send((6, "twenty-six"))
assert cache.has_value(6)
assert cache.get(6) == (6, "twenty-six")
```
"""
import asyncio
import typing
from collections.abc import Set
from .._receiver import Receiver
T_co = typing.TypeVar("T_co", covariant=True)
HashableT = typing.TypeVar("HashableT", bound=typing.Hashable)
class GroupingLatestValueCache(typing.Generic[T_co, HashableT]):
"""A cache that stores the latest value in a receiver.
It provides a way to look up the latest value in a stream without any delay,
as long as there has been one value received.
"""
def __init__(
self,
receiver: Receiver[T_co],
key: typing.Callable[[T_co], typing.Any],
*,
unique_id: str | None = None,
) -> None:
"""Create a new cache.
Args:
receiver: The receiver to cache values from.
key: An function that takes a value and returns a key to group the values
by.
unique_id: A string to help uniquely identify this instance. If not
provided, a unique identifier will be generated from the object's
[`id()`][id]. It is used mostly for debugging purposes.
"""
self._receiver: Receiver[T_co] = receiver
self._key: typing.Callable[[T_co], HashableT] = key
self._unique_id: str = hex(id(self)) if unique_id is None else unique_id
self._latest_value_by_key: dict[HashableT, T_co] = {}
self._task: asyncio.Task[None] = asyncio.create_task(
self._run(), name=f"LatestValueCache«{self._unique_id}»"
)
@property
def unique_id(self) -> str:
"""The unique identifier of this instance."""
return self._unique_id
def keys(self) -> Set[HashableT]:
"""Return the set of keys for which values have been received.
If no key function is provided, this will return an empty set.
"""
return self._latest_value_by_key.keys()
def get(self, key: HashableT) -> T_co:
"""Return the latest value that has been received.
This raises a `ValueError` if no value has been received yet. Use `has_value` to
check whether a value has been received yet, before trying to access the value,
to avoid the exception.
Args:
key: An optional key to retrieve the latest value for that key. If not
provided, it retrieves the latest value received overall.
Returns:
The latest value that has been received.
Raises:
ValueError: If no value has been received yet.
"""
if key not in self._latest_value_by_key:
raise ValueError(f"No value received for key: {key!r}")
return self._latest_value_by_key[key]
def has_value(self, key: HashableT) -> bool:
"""Check whether a value has been received yet.
If `key` is provided, it checks whether a value has been received for that key.
Args:
key: An optional key to check if a value has been received for that key.
Returns:
`True` if a value has been received, `False` otherwise.
"""
return key in self._latest_value_by_key
def clear(self, key: HashableT) -> None:
"""Clear the latest value for a specific key.
Args:
key: The key for which to clear the latest value.
"""
_ = self._latest_value_by_key.pop(key, None)
async def stop(self) -> None:
"""Stop the cache."""
if not self._task.done():
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
def __repr__(self) -> str:
"""Return a string representation of this cache."""
return (
f"<GroupingLatestValueCache num_keys={len(self._latest_value_by_key.keys())}, "
f"receiver={self._receiver!r}, unique_id={self._unique_id!r}>"
)
async def _run(self) -> None:
async for value in self._receiver:
key = self._key(value)
self._latest_value_by_key[key] = value