Skip to content

Commit 78c43c1

Browse files
committed
Make broadcast sender clonable and subscribable
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
1 parent f29ee59 commit 78c43c1

2 files changed

Lines changed: 48 additions & 3 deletions

File tree

src/frequenz/channels/_broadcast.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from ._exceptions import ChannelClosedError
1717
from ._generic import ChannelMessageT
1818
from ._receiver import Receiver, ReceiverStoppedError
19-
from ._sender import Sender, SenderError
19+
from ._sender import ClonableSubscribableSender, SenderError
2020

2121
_logger = logging.getLogger(__name__)
2222

@@ -269,7 +269,7 @@ async def close(self) -> None: # noqa: D402
269269
"""Close the channel, deprecated alias for `aclose()`.""" # noqa: D402
270270
return await self.aclose()
271271

272-
def new_sender(self) -> Sender[ChannelMessageT]:
272+
def new_sender(self) -> ClonableSubscribableSender[ChannelMessageT]:
273273
"""Return a new sender attached to this channel."""
274274
return _Sender(self)
275275

@@ -317,7 +317,7 @@ def __repr__(self) -> str:
317317
_T = TypeVar("_T")
318318

319319

320-
class _Sender(Sender[_T]):
320+
class _Sender(ClonableSubscribableSender[_T]):
321321
"""A sender to send messages to the broadcast channel.
322322
323323
Should not be created directly, but through the
@@ -365,6 +365,16 @@ async def send(self, message: _T, /) -> None:
365365
self._channel._recv_cv.notify_all()
366366
# pylint: enable=protected-access
367367

368+
@override
369+
def clone(self) -> _Sender[_T]:
370+
"""Return a clone of this sender."""
371+
return _Sender(self._channel)
372+
373+
@override
374+
def subscribe(self) -> Receiver[_T]:
375+
"""Return a new receiver attached to this sender's channel."""
376+
return self._channel.new_receiver()
377+
368378
def __str__(self) -> str:
369379
"""Return a string representation of this sender."""
370380
return f"{self._channel}:{type(self).__name__}"

src/frequenz/channels/_sender.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,14 @@
4949
```
5050
"""
5151

52+
from __future__ import annotations
53+
5254
from abc import ABC, abstractmethod
5355
from typing import Generic
5456

5557
from ._exceptions import Error
5658
from ._generic import SenderMessageT_co, SenderMessageT_contra
59+
from ._receiver import Receiver
5760

5861

5962
class Sender(ABC, Generic[SenderMessageT_contra]):
@@ -88,3 +91,35 @@ def __init__(self, message: str, sender: Sender[SenderMessageT_co]):
8891
super().__init__(message)
8992
self.sender: Sender[SenderMessageT_co] = sender
9093
"""The sender where the error happened."""
94+
95+
96+
class SubscribableSender(Sender[SenderMessageT_contra], ABC):
97+
"""A [Sender][frequenz.channels.Sender] that can be subscribed to."""
98+
99+
@abstractmethod
100+
def subscribe(self) -> Receiver[SenderMessageT_contra]:
101+
"""Subscribe to this sender.
102+
103+
Returns:
104+
A new sender that sends messages to the same channel as this sender.
105+
"""
106+
107+
108+
class ClonableSender(Sender[SenderMessageT_contra], ABC):
109+
"""A [Sender][frequenz.channels.Sender] that can be cloned."""
110+
111+
@abstractmethod
112+
def clone(self) -> ClonableSender[SenderMessageT_contra]:
113+
"""Clone this sender.
114+
115+
Returns:
116+
A new sender that sends messages to the same channel as this sender.
117+
"""
118+
119+
120+
class ClonableSubscribableSender(
121+
SubscribableSender[SenderMessageT_contra],
122+
ClonableSender[SenderMessageT_contra],
123+
ABC,
124+
):
125+
"""A [Sender][frequenz.channels.Sender] that can be both cloned and subscribed to."""

0 commit comments

Comments
 (0)