Skip to content

Commit f493243

Browse files
committed
Add one-shot channel implementation
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
1 parent 73258cb commit f493243

3 files changed

Lines changed: 210 additions & 0 deletions

File tree

src/frequenz/channels/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@
9292
)
9393
from ._latest_value_cache import LatestValueCache
9494
from ._merge import Merger, merge
95+
from ._oneshot import make_oneshot
9596
from ._receiver import Receiver, ReceiverError, ReceiverStoppedError
9697
from ._select import (
9798
Selected,
@@ -125,6 +126,7 @@
125126
"SenderMessageT_co",
126127
"SenderMessageT_contra",
127128
"UnhandledSelectedError",
129+
"make_oneshot",
128130
"merge",
129131
"select",
130132
"selected_from",

src/frequenz/channels/_oneshot.py

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
# License: MIT
2+
# Copyright © 2026 Frequenz Energy-as-a-Service GmbH
3+
4+
"""A channel that can send a single message."""
5+
6+
from __future__ import annotations
7+
8+
import asyncio
9+
import typing
10+
11+
from ._generic import ChannelMessageT
12+
from ._receiver import Receiver, ReceiverStoppedError
13+
from ._sender import Sender, SenderClosedError
14+
15+
16+
def make_oneshot(
17+
message_type: type[ChannelMessageT], # pylint: disable=unused-argument
18+
) -> tuple[OneshotSender[ChannelMessageT], OneshotReceiver[ChannelMessageT]]:
19+
"""Create a one-shot channel.
20+
21+
A one-shot channel is a channel that can only send one message. After the first
22+
message is sent, the sender is closed and any further attempts to send a message
23+
will raise a `SenderClosedError`.
24+
25+
Args:
26+
message_type: The type of messages that can be sent through this channel.
27+
28+
Returns:
29+
A tuple of a sender and a receiver for this channel.
30+
"""
31+
channel = _Oneshot[ChannelMessageT]()
32+
return OneshotSender(channel), OneshotReceiver(channel)
33+
34+
35+
class _Empty:
36+
"""A sentinel indicating that no message has been sent."""
37+
38+
39+
_EMPTY = _Empty()
40+
41+
42+
class _Oneshot(typing.Generic[ChannelMessageT]):
43+
"""A one-shot channel.
44+
45+
A one-shot channel is a channel that can only send one message. After the first
46+
message is sent, the sender is closed and any further attempts to send a message
47+
will raise a `SenderClosedError`.
48+
"""
49+
50+
def __init__(self) -> None:
51+
"""Create a new one-shot channel."""
52+
self.message: ChannelMessageT | _Empty = _EMPTY
53+
self.closed: bool = False
54+
self.drained: bool = False
55+
self.event: asyncio.Event = asyncio.Event()
56+
57+
58+
class OneshotSender(Sender[ChannelMessageT]):
59+
"""A sender for a one-shot channel."""
60+
61+
def __init__(self, channel: _Oneshot[ChannelMessageT]) -> None:
62+
"""Initialize this sender."""
63+
self._channel = channel
64+
65+
async def send(self, message: ChannelMessageT, /) -> None:
66+
"""Send a message through this sender."""
67+
if self._channel.closed:
68+
raise SenderClosedError(self)
69+
self._channel.message = message
70+
self._channel.closed = True
71+
self._channel.event.set()
72+
73+
async def aclose(self) -> None:
74+
"""Close this sender."""
75+
self._channel.closed = True
76+
if isinstance(self._channel.message, _Empty):
77+
self._channel.drained = True
78+
self._channel.event.set()
79+
80+
81+
class OneshotReceiver(Receiver[ChannelMessageT]):
82+
"""A receiver for a one-shot channel."""
83+
84+
def __init__(self, channel: _Oneshot[ChannelMessageT]) -> None:
85+
"""Initialize this receiver."""
86+
self._channel = channel
87+
88+
async def ready(self) -> bool:
89+
"""Check if a message is ready to be received.
90+
91+
Returns:
92+
`True` if a message is ready to be received, `False` if the sender
93+
is closed and no message will be sent.
94+
"""
95+
if self._channel.drained:
96+
return False
97+
while not self._channel.closed:
98+
await self._channel.event.wait()
99+
if isinstance(self._channel.message, _Empty):
100+
return False
101+
return True
102+
103+
def consume(self) -> ChannelMessageT:
104+
"""Consume a message from this receiver.
105+
106+
Returns:
107+
The message that was sent through this channel.
108+
109+
Raises:
110+
ReceiverStoppedError: If the sender was closed without sending a message.
111+
"""
112+
if self._channel.drained:
113+
raise ReceiverStoppedError(self)
114+
115+
assert not isinstance(
116+
self._channel.message, _Empty
117+
), "`consume()` must be preceded by a call to `ready()`."
118+
119+
self._channel.drained = True
120+
self._channel.event.clear()
121+
return self._channel.message

tests/test_oneshot.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
# License: MIT
2+
# Copyright © 2026 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Tests for the oneshot channel."""
5+
6+
import asyncio
7+
8+
import pytest
9+
10+
from frequenz.channels import (
11+
ReceiverStoppedError,
12+
SenderClosedError,
13+
make_oneshot,
14+
)
15+
16+
17+
async def test_oneshot_recv_after_send() -> None:
18+
"""Test the oneshot function.
19+
20+
`receiver.receive()` is called after `sender.send()`.
21+
"""
22+
sender, receiver = make_oneshot(int)
23+
24+
await sender.send(42)
25+
assert await receiver.receive() == 42
26+
27+
with pytest.raises(SenderClosedError):
28+
await sender.send(43)
29+
with pytest.raises(ReceiverStoppedError):
30+
await receiver.receive()
31+
32+
33+
async def test_oneshot_recv_before_send() -> None:
34+
"""Test the oneshot function.
35+
36+
`receiver.receive()` is called before `sender.send()`.
37+
"""
38+
sender, receiver = make_oneshot(int)
39+
40+
task = asyncio.create_task(receiver.receive())
41+
42+
# Give the receiver a chance to start waiting
43+
await asyncio.sleep(0.0)
44+
45+
await sender.send(42)
46+
assert await task == 42
47+
48+
with pytest.raises(SenderClosedError):
49+
await sender.send(43)
50+
with pytest.raises(ReceiverStoppedError):
51+
await receiver.receive()
52+
53+
54+
async def test_oneshot_recv_after_sender_closed() -> None:
55+
"""Test that closing sender works without sending a message.
56+
57+
`receiver.receive()` is called after `sender.aclose()`.
58+
"""
59+
sender, receiver = make_oneshot(int)
60+
61+
await sender.aclose()
62+
63+
with pytest.raises(ReceiverStoppedError):
64+
await receiver.receive()
65+
with pytest.raises(SenderClosedError):
66+
await sender.send(4)
67+
68+
69+
async def test_oneshot_recv_before_sender_closed() -> None:
70+
"""Test that closing sender works without sending a message.
71+
72+
`receiver.receive()` is called before `sender.aclose()`.
73+
"""
74+
sender, receiver = make_oneshot(int)
75+
76+
task = asyncio.create_task(receiver.receive())
77+
78+
# Give the receiver a chance to start waiting
79+
await asyncio.sleep(0.0)
80+
81+
await sender.aclose()
82+
83+
with pytest.raises(ReceiverStoppedError):
84+
await task
85+
86+
with pytest.raises(SenderClosedError):
87+
await sender.send(4)

0 commit comments

Comments
 (0)