Skip to content

Commit 318ad66

Browse files
committed
Support closing of Senders
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
1 parent f29ee59 commit 318ad66

5 files changed

Lines changed: 65 additions & 5 deletions

File tree

src/frequenz/channels/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@
100100
select,
101101
selected_from,
102102
)
103-
from ._sender import Sender, SenderError
103+
from ._sender import Sender, SenderClosedError, SenderError
104104

105105
__all__ = [
106106
"Anycast",
@@ -120,6 +120,7 @@
120120
"SelectError",
121121
"Selected",
122122
"Sender",
123+
"SenderClosedError",
123124
"SenderError",
124125
"SenderMessageT_co",
125126
"SenderMessageT_contra",

src/frequenz/channels/_anycast.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from ._exceptions import ChannelClosedError
1616
from ._generic import ChannelMessageT
1717
from ._receiver import Receiver, ReceiverStoppedError
18-
from ._sender import Sender, SenderError
18+
from ._sender import Sender, SenderClosedError, SenderError
1919

2020
_logger = logging.getLogger(__name__)
2121

@@ -327,6 +327,9 @@ def __init__(self, channel: Anycast[_T], /) -> None:
327327
self._channel: Anycast[_T] = channel
328328
"""The channel that this sender belongs to."""
329329

330+
self._closed: bool = False
331+
"""Whether the sender is closed."""
332+
330333
@override
331334
async def send(self, message: _T, /) -> None:
332335
"""Send a message across the channel.
@@ -343,7 +346,11 @@ async def send(self, message: _T, /) -> None:
343346
SenderError: If the underlying channel was closed.
344347
A [ChannelClosedError][frequenz.channels.ChannelClosedError] is
345348
set as the cause.
349+
SenderClosedError: If this sender was closed.
346350
"""
351+
if self._closed:
352+
raise SenderClosedError(self)
353+
347354
# pylint: disable=protected-access
348355
if self._channel._closed:
349356
raise SenderError("The channel was closed", self) from ChannelClosedError(
@@ -367,6 +374,16 @@ async def send(self, message: _T, /) -> None:
367374
self._channel._recv_cv.notify(1)
368375
# pylint: enable=protected-access
369376

377+
@override
378+
async def aclose(self) -> None:
379+
"""Close this sender.
380+
381+
After closing, the sender will not be able to send any more messages. Any
382+
attempt to send a message through a closed sender will raise a
383+
[SenderError][frequenz.channels.SenderError].
384+
"""
385+
self._closed = True
386+
370387
def __str__(self) -> str:
371388
"""Return a string representation of this sender."""
372389
return f"{self._channel}:{type(self).__name__}"

src/frequenz/channels/_broadcast.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from ._exceptions import ChannelClosedError
1717
from ._generic import ChannelMessageT
1818
from ._receiver import Receiver, ReceiverStoppedError
19-
from ._sender import Sender, SenderError
19+
from ._sender import Sender, SenderClosedError, SenderError
2020

2121
_logger = logging.getLogger(__name__)
2222

@@ -334,6 +334,9 @@ def __init__(self, channel: Broadcast[_T], /) -> None:
334334
self._channel: Broadcast[_T] = channel
335335
"""The broadcast channel this sender belongs to."""
336336

337+
self._closed: bool = False
338+
"""Whether this sender is closed."""
339+
337340
@override
338341
async def send(self, message: _T, /) -> None:
339342
"""Send a message to all broadcast receivers.
@@ -345,7 +348,10 @@ async def send(self, message: _T, /) -> None:
345348
SenderError: If the underlying channel was closed.
346349
A [ChannelClosedError][frequenz.channels.ChannelClosedError] is
347350
set as the cause.
351+
SenderClosedError: If this sender was closed.
348352
"""
353+
if self._closed:
354+
raise SenderClosedError(self)
349355
# pylint: disable=protected-access
350356
if self._channel._closed:
351357
raise SenderError("The channel was closed", self) from ChannelClosedError(
@@ -365,6 +371,16 @@ async def send(self, message: _T, /) -> None:
365371
self._channel._recv_cv.notify_all()
366372
# pylint: enable=protected-access
367373

374+
@override
375+
async def aclose(self) -> None:
376+
"""Close this sender.
377+
378+
After a sender is closed, it can no longer be used to send messages. Any
379+
attempt to send a message through a closed sender will raise a
380+
[SenderError][frequenz.channels.SenderError].
381+
"""
382+
self._closed = True
383+
368384
def __str__(self) -> str:
369385
"""Return a string representation of this sender."""
370386
return f"{self._channel}:{type(self).__name__}"

src/frequenz/channels/_sender.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,15 @@ async def send(self, message: SenderMessageT_contra, /) -> None:
7070
SenderError: If there was an error sending the message.
7171
"""
7272

73+
@abstractmethod
74+
async def aclose(self) -> None:
75+
"""Close this sender.
76+
77+
After a sender is closed, it can no longer be used to send messages. Any
78+
attempt to send a message through a closed sender will raise a
79+
[SenderClosedError][frequenz.channels.SenderClosedError].
80+
"""
81+
7382

7483
class SenderError(Error, Generic[SenderMessageT_co]):
7584
"""An error that originated in a [Sender][frequenz.channels.Sender].
@@ -88,3 +97,15 @@ def __init__(self, message: str, sender: Sender[SenderMessageT_co]):
8897
super().__init__(message)
8998
self.sender: Sender[SenderMessageT_co] = sender
9099
"""The sender where the error happened."""
100+
101+
102+
class SenderClosedError(SenderError[SenderMessageT_co]):
103+
"""An error indicating that a send operation was attempted a closed sender."""
104+
105+
def __init__(self, sender: Sender[SenderMessageT_co]):
106+
"""Initialize this error.
107+
108+
Args:
109+
sender: The [Sender][frequenz.channels.Sender] that was closed.
110+
"""
111+
super().__init__("Sender is closed", sender)

src/frequenz/channels/experimental/_relay_sender.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,15 @@
77
to the senders it was created with.
88
"""
99

10-
import typing
10+
import asyncio
1111

1212
from typing_extensions import override
1313

1414
from .._generic import SenderMessageT_contra
1515
from .._sender import Sender
1616

1717

18-
class RelaySender(typing.Generic[SenderMessageT_contra], Sender[SenderMessageT_contra]):
18+
class RelaySender(Sender[SenderMessageT_contra]):
1919
"""A Sender for sending messages to multiple senders.
2020
2121
The `RelaySender` class takes multiple senders and forwards all the messages sent to
@@ -57,3 +57,8 @@ async def send(self, message: SenderMessageT_contra, /) -> None:
5757
"""
5858
for sender in self._senders:
5959
await sender.send(message)
60+
61+
@override
62+
async def aclose(self) -> None:
63+
"""Close this sender."""
64+
await asyncio.gather(*(sender.aclose() for sender in self._senders))

0 commit comments

Comments
 (0)