Skip to content

Commit e44a7ff

Browse files
committed
Add one-shot channel implementation
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
1 parent 73258cb commit e44a7ff

3 files changed

Lines changed: 204 additions & 0 deletions

File tree

src/frequenz/channels/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@
9292
)
9393
from ._latest_value_cache import LatestValueCache
9494
from ._merge import Merger, merge
95+
from ._oneshot import make_oneshot
9596
from ._receiver import Receiver, ReceiverError, ReceiverStoppedError
9697
from ._select import (
9798
Selected,
@@ -125,6 +126,7 @@
125126
"SenderMessageT_co",
126127
"SenderMessageT_contra",
127128
"UnhandledSelectedError",
129+
"make_oneshot",
128130
"merge",
129131
"select",
130132
"selected_from",

src/frequenz/channels/_oneshot.py

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
# License: MIT
2+
# Copyright © 2026 Frequenz Energy-as-a-Service GmbH
3+
4+
"""A channel that can send a single message."""
5+
6+
import asyncio
7+
import typing
8+
9+
from ._generic import ChannelMessageT
10+
from ._receiver import Receiver, ReceiverStoppedError
11+
from ._sender import Sender, SenderClosedError
12+
13+
14+
def make_oneshot(
15+
message_type: type[ChannelMessageT], # pylint: disable=unused-argument
16+
) -> tuple[Sender[ChannelMessageT], Receiver[ChannelMessageT]]:
17+
"""Create a one-shot channel.
18+
19+
A one-shot channel is a channel that can only send one message. After the first
20+
message is sent, the sender is closed and any further attempts to send a message
21+
will raise a `SenderClosedError`.
22+
23+
Args:
24+
message_type: The type of messages that can be sent through this channel.
25+
26+
Returns:
27+
A tuple of a sender and a receiver for this channel.
28+
"""
29+
channel = _OneShot[ChannelMessageT]()
30+
return _OneShotSender(channel), _OneShotReceiver(channel)
31+
32+
33+
class _Empty:
34+
"""A sentinel indicating that no message has been sent."""
35+
36+
37+
_EMPTY = _Empty()
38+
39+
40+
class _OneShot(typing.Generic[ChannelMessageT]):
41+
"""A one-shot channel.
42+
43+
A one-shot channel is a channel that can only send one message. After the first
44+
message is sent, the sender is closed and any further attempts to send a message
45+
will raise a `SenderClosedError`.
46+
"""
47+
48+
def __init__(self) -> None:
49+
"""Create a new one-shot channel."""
50+
self.message: ChannelMessageT | _Empty = _EMPTY
51+
self.closed: bool = False
52+
self.drained: bool = False
53+
self.event: asyncio.Event = asyncio.Event()
54+
55+
56+
class _OneShotSender(Sender[ChannelMessageT]):
57+
def __init__(self, channel: _OneShot[ChannelMessageT]) -> None:
58+
"""Initialize this sender."""
59+
self._channel = channel
60+
61+
async def send(self, message: ChannelMessageT, /) -> None:
62+
"""Send a message through this sender."""
63+
if self._channel.closed:
64+
raise SenderClosedError(self)
65+
self._channel.message = message
66+
self._channel.closed = True
67+
self._channel.event.set()
68+
69+
async def aclose(self) -> None:
70+
"""Close this sender."""
71+
self._channel.closed = True
72+
if isinstance(self._channel.message, _Empty):
73+
self._channel.drained = True
74+
self._channel.event.set()
75+
76+
77+
class _OneShotReceiver(Receiver[ChannelMessageT]):
78+
def __init__(self, channel: _OneShot[ChannelMessageT]) -> None:
79+
"""Initialize this receiver."""
80+
self._channel = channel
81+
82+
async def ready(self) -> bool:
83+
"""Check if a message is ready to be received.
84+
85+
Returns:
86+
`True` if a message is ready to be received, `False` if the sender
87+
is closed and no message will be sent.
88+
"""
89+
if self._channel.drained:
90+
return False
91+
while not self._channel.closed:
92+
await self._channel.event.wait()
93+
if isinstance(self._channel.message, _Empty):
94+
return False
95+
return True
96+
97+
def consume(self) -> ChannelMessageT:
98+
"""Consume a message from this receiver.
99+
100+
Returns:
101+
The message that was sent through this channel.
102+
103+
Raises:
104+
ReceiverStoppedError: If the sender was closed without sending a message.
105+
"""
106+
if self._channel.drained:
107+
raise ReceiverStoppedError(self)
108+
109+
assert not isinstance(
110+
self._channel.message, _Empty
111+
), "`consume()` must be preceded by a call to `ready()`."
112+
113+
self._channel.drained = True
114+
self._channel.event.clear()
115+
return self._channel.message

tests/test_oneshot.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
# License: MIT
2+
# Copyright © 2026 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Tests for the oneshot channel."""
5+
6+
import asyncio
7+
8+
import pytest
9+
10+
from frequenz.channels import (
11+
ReceiverStoppedError,
12+
SenderClosedError,
13+
make_oneshot,
14+
)
15+
16+
17+
async def test_oneshot_recv_after_send() -> None:
18+
"""Test the oneshot function.
19+
20+
`receiver.receive()` is called after `sender.send()`.
21+
"""
22+
sender, receiver = make_oneshot(int)
23+
24+
await sender.send(42)
25+
assert await receiver.receive() == 42
26+
27+
with pytest.raises(SenderClosedError):
28+
await sender.send(43)
29+
with pytest.raises(ReceiverStoppedError):
30+
await receiver.receive()
31+
32+
33+
async def test_oneshot_recv_before_send() -> None:
34+
"""Test the oneshot function.
35+
36+
`receiver.receive()` is called before `sender.send()`.
37+
"""
38+
sender, receiver = make_oneshot(int)
39+
40+
task = asyncio.create_task(receiver.receive())
41+
42+
# Give the receiver a chance to start waiting
43+
await asyncio.sleep(0.0)
44+
45+
await sender.send(42)
46+
assert await task == 42
47+
48+
with pytest.raises(SenderClosedError):
49+
await sender.send(43)
50+
with pytest.raises(ReceiverStoppedError):
51+
await receiver.receive()
52+
53+
54+
async def test_oneshot_recv_after_sender_closed() -> None:
55+
"""Test that closing sender works without sending a message.
56+
57+
`receiver.receive()` is called after `sender.aclose()`.
58+
"""
59+
sender, receiver = make_oneshot(int)
60+
61+
await sender.aclose()
62+
63+
with pytest.raises(ReceiverStoppedError):
64+
await receiver.receive()
65+
with pytest.raises(SenderClosedError):
66+
await sender.send(4)
67+
68+
69+
async def test_oneshot_recv_before_sender_closed() -> None:
70+
"""Test that closing sender works without sending a message.
71+
72+
`receiver.receive()` is called before `sender.aclose()`.
73+
"""
74+
sender, receiver = make_oneshot(int)
75+
76+
task = asyncio.create_task(receiver.receive())
77+
78+
# Give the receiver a chance to start waiting
79+
await asyncio.sleep(0.0)
80+
81+
await sender.aclose()
82+
83+
with pytest.raises(ReceiverStoppedError):
84+
await task
85+
86+
with pytest.raises(SenderClosedError):
87+
await sender.send(4)

0 commit comments

Comments
 (0)