Skip to content

Commit e8de199

Browse files
committed
Add one-shot channel implementation
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
1 parent 73258cb commit e8de199

3 files changed

Lines changed: 231 additions & 0 deletions

File tree

src/frequenz/channels/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@
9292
)
9393
from ._latest_value_cache import LatestValueCache
9494
from ._merge import Merger, merge
95+
from ._oneshot import OneshotChannel
9596
from ._receiver import Receiver, ReceiverError, ReceiverStoppedError
9697
from ._select import (
9798
Selected,
@@ -113,6 +114,7 @@
113114
"LatestValueCache",
114115
"MappedMessageT_co",
115116
"Merger",
117+
"OneshotChannel",
116118
"Receiver",
117119
"ReceiverError",
118120
"ReceiverMessageT_co",

src/frequenz/channels/_oneshot.py

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
# License: MIT
2+
# Copyright © 2026 Frequenz Energy-as-a-Service GmbH
3+
4+
"""A channel that can send a single message."""
5+
6+
from __future__ import annotations
7+
8+
import asyncio
9+
import typing
10+
11+
from ._generic import ChannelMessageT
12+
from ._receiver import Receiver, ReceiverStoppedError
13+
from ._sender import Sender, SenderClosedError
14+
15+
16+
class _Empty:
17+
"""A sentinel indicating that no message has been sent."""
18+
19+
20+
_EMPTY = _Empty()
21+
22+
23+
class _Oneshot(typing.Generic[ChannelMessageT]):
24+
"""Internal representation of a one-shot channel.
25+
26+
A one-shot channel is a channel that can only send one message. After the first
27+
message is sent, the sender is closed and any further attempts to send a message
28+
will raise a `SenderClosedError`.
29+
30+
# Example
31+
32+
This example demonstrates how to use a one-shot channel to send a message
33+
from one task to another.
34+
35+
```python
36+
import asyncio
37+
38+
from frequenz.channels import OneshotChannel, OneshotSender
39+
40+
async def send(sender: OneshotSender[int]) -> None:
41+
await sender.send(42)
42+
43+
async def main() -> None:
44+
sender, receiver = OneshotChannel[int]()
45+
46+
async with asyncio.TaskGroup() as tg:
47+
tg.create_task(send(sender))
48+
assert await receiver.receive() == 42
49+
50+
asyncio.run(main())
51+
```
52+
"""
53+
54+
def __init__(self) -> None:
55+
"""Create a new one-shot channel."""
56+
self.message: ChannelMessageT | _Empty = _EMPTY
57+
self.closed: bool = False
58+
self.drained: bool = False
59+
self.event: asyncio.Event = asyncio.Event()
60+
61+
62+
class OneshotSender(Sender[ChannelMessageT]):
63+
"""A sender for a one-shot channel."""
64+
65+
def __init__(self, channel: _Oneshot[ChannelMessageT]) -> None:
66+
"""Initialize this sender."""
67+
self._channel = channel
68+
69+
async def send(self, message: ChannelMessageT, /) -> None:
70+
"""Send a message through this sender."""
71+
if self._channel.closed:
72+
raise SenderClosedError(self)
73+
self._channel.message = message
74+
self._channel.closed = True
75+
self._channel.event.set()
76+
77+
async def aclose(self) -> None:
78+
"""Close this sender."""
79+
self._channel.closed = True
80+
if isinstance(self._channel.message, _Empty):
81+
self._channel.drained = True
82+
self._channel.event.set()
83+
84+
85+
class OneshotReceiver(Receiver[ChannelMessageT]):
86+
"""A receiver for a one-shot channel."""
87+
88+
def __init__(self, channel: _Oneshot[ChannelMessageT]) -> None:
89+
"""Initialize this receiver."""
90+
self._channel = channel
91+
92+
async def ready(self) -> bool:
93+
"""Check if a message is ready to be received.
94+
95+
Returns:
96+
`True` if a message is ready to be received, `False` if the sender
97+
is closed and no message will be sent.
98+
"""
99+
if self._channel.drained:
100+
return False
101+
while not self._channel.closed:
102+
await self._channel.event.wait()
103+
if isinstance(self._channel.message, _Empty):
104+
return False
105+
return True
106+
107+
def consume(self) -> ChannelMessageT:
108+
"""Consume a message from this receiver.
109+
110+
Returns:
111+
The message that was sent through this channel.
112+
113+
Raises:
114+
ReceiverStoppedError: If the sender was closed without sending a message.
115+
"""
116+
if self._channel.drained:
117+
raise ReceiverStoppedError(self)
118+
119+
assert not isinstance(
120+
self._channel.message, _Empty
121+
), "`consume()` must be preceded by a call to `ready()`."
122+
123+
self._channel.drained = True
124+
self._channel.event.clear()
125+
return self._channel.message
126+
127+
128+
class OneshotChannel(
129+
tuple[OneshotSender[ChannelMessageT], OneshotReceiver[ChannelMessageT]]
130+
):
131+
"""A one-shot channel.
132+
133+
A one-shot channel is a channel that can only send one message. After the first
134+
message is sent, the sender is closed and any further attempts to send a message
135+
will raise a `SenderClosedError`.
136+
"""
137+
138+
def __new__(cls) -> OneshotChannel[ChannelMessageT]:
139+
"""Create a new one-shot channel."""
140+
channel = _Oneshot[ChannelMessageT]()
141+
142+
return tuple.__new__(cls, (OneshotSender(channel), OneshotReceiver(channel)))

tests/test_oneshot.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
# License: MIT
2+
# Copyright © 2026 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Tests for the oneshot channel."""
5+
6+
import asyncio
7+
8+
import pytest
9+
10+
from frequenz.channels import (
11+
OneshotChannel,
12+
ReceiverStoppedError,
13+
SenderClosedError,
14+
)
15+
16+
17+
async def test_oneshot_recv_after_send() -> None:
18+
"""Test the oneshot function.
19+
20+
`receiver.receive()` is called after `sender.send()`.
21+
"""
22+
sender, receiver = OneshotChannel[int]()
23+
24+
await sender.send(42)
25+
assert await receiver.receive() == 42
26+
27+
with pytest.raises(SenderClosedError):
28+
await sender.send(43)
29+
with pytest.raises(ReceiverStoppedError):
30+
await receiver.receive()
31+
32+
33+
async def test_oneshot_recv_before_send() -> None:
34+
"""Test the oneshot function.
35+
36+
`receiver.receive()` is called before `sender.send()`.
37+
"""
38+
sender, receiver = OneshotChannel[int]()
39+
40+
task = asyncio.create_task(receiver.receive())
41+
42+
# Give the receiver a chance to start waiting
43+
await asyncio.sleep(0.0)
44+
45+
await sender.send(42)
46+
assert await task == 42
47+
48+
with pytest.raises(SenderClosedError):
49+
await sender.send(43)
50+
with pytest.raises(ReceiverStoppedError):
51+
await receiver.receive()
52+
53+
54+
async def test_oneshot_recv_after_sender_closed() -> None:
55+
"""Test that closing sender works without sending a message.
56+
57+
`receiver.receive()` is called after `sender.aclose()`.
58+
"""
59+
sender, receiver = OneshotChannel[int]()
60+
61+
await sender.aclose()
62+
63+
with pytest.raises(ReceiverStoppedError):
64+
await receiver.receive()
65+
with pytest.raises(SenderClosedError):
66+
await sender.send(4)
67+
68+
69+
async def test_oneshot_recv_before_sender_closed() -> None:
70+
"""Test that closing sender works without sending a message.
71+
72+
`receiver.receive()` is called before `sender.aclose()`.
73+
"""
74+
sender, receiver = OneshotChannel[int]()
75+
76+
task = asyncio.create_task(receiver.receive())
77+
78+
# Give the receiver a chance to start waiting
79+
await asyncio.sleep(0.0)
80+
81+
await sender.aclose()
82+
83+
with pytest.raises(ReceiverStoppedError):
84+
await task
85+
86+
with pytest.raises(SenderClosedError):
87+
await sender.send(4)

0 commit comments

Comments
 (0)