Skip to content

Commit 5174e6c

Browse files
authored
Add the oneshot channel implementation (#502)
2 parents f29ee59 + 9c7a4b2 commit 5174e6c

8 files changed

Lines changed: 301 additions & 6 deletions

File tree

RELEASE_NOTES.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@
1111

1212
## New Features
1313

14-
<!-- Here goes the main new features and examples or instructions on how to use them -->
14+
- There's a new `Oneshot` channel, which returns a sender and a receiver. A single message can be sent using the sender, after which it will be closed. And the receiver will close as soon as the message is received.
15+
16+
- `Sender`s now have an `aclose`, which must be called, when they are no-longer needed.
1517

1618
## Bug Fixes
1719

src/frequenz/channels/__init__.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@
9292
)
9393
from ._latest_value_cache import LatestValueCache
9494
from ._merge import Merger, merge
95+
from ._oneshot import OneshotChannel, OneshotReceiver, OneshotSender
9596
from ._receiver import Receiver, ReceiverError, ReceiverStoppedError
9697
from ._select import (
9798
Selected,
@@ -100,7 +101,7 @@
100101
select,
101102
selected_from,
102103
)
103-
from ._sender import Sender, SenderError
104+
from ._sender import Sender, SenderClosedError, SenderError
104105

105106
__all__ = [
106107
"Anycast",
@@ -113,13 +114,17 @@
113114
"LatestValueCache",
114115
"MappedMessageT_co",
115116
"Merger",
117+
"OneshotChannel",
118+
"OneshotReceiver",
119+
"OneshotSender",
116120
"Receiver",
117121
"ReceiverError",
118122
"ReceiverMessageT_co",
119123
"ReceiverStoppedError",
120124
"SelectError",
121125
"Selected",
122126
"Sender",
127+
"SenderClosedError",
123128
"SenderError",
124129
"SenderMessageT_co",
125130
"SenderMessageT_contra",

src/frequenz/channels/_anycast.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from ._exceptions import ChannelClosedError
1616
from ._generic import ChannelMessageT
1717
from ._receiver import Receiver, ReceiverStoppedError
18-
from ._sender import Sender, SenderError
18+
from ._sender import Sender, SenderClosedError, SenderError
1919

2020
_logger = logging.getLogger(__name__)
2121

@@ -327,6 +327,9 @@ def __init__(self, channel: Anycast[_T], /) -> None:
327327
self._channel: Anycast[_T] = channel
328328
"""The channel that this sender belongs to."""
329329

330+
self._closed: bool = False
331+
"""Whether the sender is closed."""
332+
330333
@override
331334
async def send(self, message: _T, /) -> None:
332335
"""Send a message across the channel.
@@ -343,7 +346,11 @@ async def send(self, message: _T, /) -> None:
343346
SenderError: If the underlying channel was closed.
344347
A [ChannelClosedError][frequenz.channels.ChannelClosedError] is
345348
set as the cause.
349+
SenderClosedError: If this sender was closed.
346350
"""
351+
if self._closed:
352+
raise SenderClosedError(self)
353+
347354
# pylint: disable=protected-access
348355
if self._channel._closed:
349356
raise SenderError("The channel was closed", self) from ChannelClosedError(
@@ -367,6 +374,16 @@ async def send(self, message: _T, /) -> None:
367374
self._channel._recv_cv.notify(1)
368375
# pylint: enable=protected-access
369376

377+
@override
378+
async def aclose(self) -> None:
379+
"""Close this sender.
380+
381+
After closing, the sender will not be able to send any more messages. Any
382+
attempt to send a message through a closed sender will raise a
383+
[SenderError][frequenz.channels.SenderError].
384+
"""
385+
self._closed = True
386+
370387
def __str__(self) -> str:
371388
"""Return a string representation of this sender."""
372389
return f"{self._channel}:{type(self).__name__}"

src/frequenz/channels/_broadcast.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from ._exceptions import ChannelClosedError
1717
from ._generic import ChannelMessageT
1818
from ._receiver import Receiver, ReceiverStoppedError
19-
from ._sender import Sender, SenderError
19+
from ._sender import Sender, SenderClosedError, SenderError
2020

2121
_logger = logging.getLogger(__name__)
2222

@@ -334,6 +334,9 @@ def __init__(self, channel: Broadcast[_T], /) -> None:
334334
self._channel: Broadcast[_T] = channel
335335
"""The broadcast channel this sender belongs to."""
336336

337+
self._closed: bool = False
338+
"""Whether this sender is closed."""
339+
337340
@override
338341
async def send(self, message: _T, /) -> None:
339342
"""Send a message to all broadcast receivers.
@@ -345,7 +348,10 @@ async def send(self, message: _T, /) -> None:
345348
SenderError: If the underlying channel was closed.
346349
A [ChannelClosedError][frequenz.channels.ChannelClosedError] is
347350
set as the cause.
351+
SenderClosedError: If this sender was closed.
348352
"""
353+
if self._closed:
354+
raise SenderClosedError(self)
349355
# pylint: disable=protected-access
350356
if self._channel._closed:
351357
raise SenderError("The channel was closed", self) from ChannelClosedError(
@@ -365,6 +371,16 @@ async def send(self, message: _T, /) -> None:
365371
self._channel._recv_cv.notify_all()
366372
# pylint: enable=protected-access
367373

374+
@override
375+
async def aclose(self) -> None:
376+
"""Close this sender.
377+
378+
After a sender is closed, it can no longer be used to send messages. Any
379+
attempt to send a message through a closed sender will raise a
380+
[SenderClosedError][frequenz.channels.SenderClosedError].
381+
"""
382+
self._closed = True
383+
368384
def __str__(self) -> str:
369385
"""Return a string representation of this sender."""
370386
return f"{self._channel}:{type(self).__name__}"

src/frequenz/channels/_oneshot.py

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
# License: MIT
2+
# Copyright © 2026 Frequenz Energy-as-a-Service GmbH
3+
4+
"""A channel that can send a single message."""
5+
6+
from __future__ import annotations
7+
8+
import asyncio
9+
import typing
10+
11+
from ._generic import ChannelMessageT
12+
from ._receiver import Receiver, ReceiverStoppedError
13+
from ._sender import Sender, SenderClosedError
14+
15+
16+
class _Empty:
17+
"""A sentinel indicating that no message has been sent."""
18+
19+
20+
_EMPTY = _Empty()
21+
22+
23+
class _Oneshot(typing.Generic[ChannelMessageT]):
24+
"""Internal representation of a one-shot channel.
25+
26+
A one-shot channel is a channel that can only send one message. After the first
27+
message is sent, the sender is closed and any further attempts to send a message
28+
will raise a `SenderClosedError`.
29+
"""
30+
31+
def __init__(self) -> None:
32+
"""Create a new one-shot channel."""
33+
self.message: ChannelMessageT | _Empty = _EMPTY
34+
self.closed: bool = False
35+
self.drained: bool = False
36+
self.event: asyncio.Event = asyncio.Event()
37+
38+
39+
class OneshotSender(Sender[ChannelMessageT]):
40+
"""A sender for a one-shot channel."""
41+
42+
def __init__(self, channel: _Oneshot[ChannelMessageT]) -> None:
43+
"""Initialize this sender."""
44+
self._channel = channel
45+
46+
async def send(self, message: ChannelMessageT, /) -> None:
47+
"""Send a message through this sender."""
48+
if self._channel.closed:
49+
raise SenderClosedError(self)
50+
self._channel.message = message
51+
self._channel.closed = True
52+
self._channel.event.set()
53+
54+
async def aclose(self) -> None:
55+
"""Close this sender."""
56+
self._channel.closed = True
57+
if isinstance(self._channel.message, _Empty):
58+
self._channel.drained = True
59+
self._channel.event.set()
60+
61+
62+
class OneshotReceiver(Receiver[ChannelMessageT]):
63+
"""A receiver for a one-shot channel."""
64+
65+
def __init__(self, channel: _Oneshot[ChannelMessageT]) -> None:
66+
"""Initialize this receiver."""
67+
self._channel = channel
68+
69+
async def ready(self) -> bool:
70+
"""Check if a message is ready to be received.
71+
72+
Returns:
73+
`True` if a message is ready to be received, `False` if the sender
74+
is closed and no message will be sent.
75+
"""
76+
if self._channel.drained:
77+
return False
78+
while not self._channel.closed:
79+
await self._channel.event.wait()
80+
if isinstance(self._channel.message, _Empty):
81+
return False
82+
return True
83+
84+
def consume(self) -> ChannelMessageT:
85+
"""Consume a message from this receiver.
86+
87+
Returns:
88+
The message that was sent through this channel.
89+
90+
Raises:
91+
ReceiverStoppedError: If the sender was closed without sending a message.
92+
"""
93+
if self._channel.drained:
94+
raise ReceiverStoppedError(self)
95+
96+
assert not isinstance(
97+
self._channel.message, _Empty
98+
), "`consume()` must be preceded by a call to `ready()`."
99+
100+
self._channel.drained = True
101+
self._channel.event.clear()
102+
return self._channel.message
103+
104+
105+
class OneshotChannel(
106+
tuple[OneshotSender[ChannelMessageT], OneshotReceiver[ChannelMessageT]]
107+
):
108+
"""A channel that can send a single message.
109+
110+
A one-shot channel is a channel that can only send one message. After the first
111+
message is sent, the sender is closed and any further attempts to send a message
112+
will raise a `SenderClosedError`.
113+
114+
# Example
115+
116+
This example demonstrates how to use a one-shot channel to send a message
117+
from one task to another.
118+
119+
```python
120+
import asyncio
121+
122+
from frequenz.channels import OneshotChannel, OneshotSender
123+
124+
async def send(sender: OneshotSender[int]) -> None:
125+
await sender.send(42)
126+
127+
async def main() -> None:
128+
sender, receiver = OneshotChannel[int]()
129+
130+
async with asyncio.TaskGroup() as tg:
131+
tg.create_task(send(sender))
132+
assert await receiver.receive() == 42
133+
134+
asyncio.run(main())
135+
```
136+
"""
137+
138+
def __new__(cls) -> OneshotChannel[ChannelMessageT]:
139+
"""Create a new one-shot channel."""
140+
channel = _Oneshot[ChannelMessageT]()
141+
142+
return tuple.__new__(cls, (OneshotSender(channel), OneshotReceiver(channel)))

src/frequenz/channels/_sender.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,15 @@ async def send(self, message: SenderMessageT_contra, /) -> None:
7070
SenderError: If there was an error sending the message.
7171
"""
7272

73+
@abstractmethod
74+
async def aclose(self) -> None:
75+
"""Close this sender.
76+
77+
After a sender is closed, it can no longer be used to send messages. Any
78+
attempt to send a message through a closed sender will raise a
79+
[SenderClosedError][frequenz.channels.SenderClosedError].
80+
"""
81+
7382

7483
class SenderError(Error, Generic[SenderMessageT_co]):
7584
"""An error that originated in a [Sender][frequenz.channels.Sender].
@@ -88,3 +97,15 @@ def __init__(self, message: str, sender: Sender[SenderMessageT_co]):
8897
super().__init__(message)
8998
self.sender: Sender[SenderMessageT_co] = sender
9099
"""The sender where the error happened."""
100+
101+
102+
class SenderClosedError(SenderError[SenderMessageT_co]):
103+
"""An error indicating that a send operation was attempted on a closed sender."""
104+
105+
def __init__(self, sender: Sender[SenderMessageT_co]):
106+
"""Initialize this error.
107+
108+
Args:
109+
sender: The [Sender][frequenz.channels.Sender] that was closed.
110+
"""
111+
super().__init__("Sender is closed", sender)

src/frequenz/channels/experimental/_relay_sender.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,15 @@
77
to the senders it was created with.
88
"""
99

10-
import typing
10+
import asyncio
1111

1212
from typing_extensions import override
1313

1414
from .._generic import SenderMessageT_contra
1515
from .._sender import Sender
1616

1717

18-
class RelaySender(typing.Generic[SenderMessageT_contra], Sender[SenderMessageT_contra]):
18+
class RelaySender(Sender[SenderMessageT_contra]):
1919
"""A Sender for sending messages to multiple senders.
2020
2121
The `RelaySender` class takes multiple senders and forwards all the messages sent to
@@ -57,3 +57,8 @@ async def send(self, message: SenderMessageT_contra, /) -> None:
5757
"""
5858
for sender in self._senders:
5959
await sender.send(message)
60+
61+
@override
62+
async def aclose(self) -> None:
63+
"""Close this sender."""
64+
await asyncio.gather(*(sender.aclose() for sender in self._senders))

0 commit comments

Comments
 (0)