Skip to content

Commit b307b3b

Browse files
committed
Make broadcast sender clonable and subscribable
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
1 parent 50a80ad commit b307b3b

2 files changed

Lines changed: 55 additions & 3 deletions

File tree

src/frequenz/channels/_broadcast.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from ._exceptions import ChannelClosedError
1717
from ._generic import ChannelMessageT
1818
from ._receiver import Receiver, ReceiverStoppedError
19-
from ._sender import Sender, SenderClosedError, SenderError
19+
from ._sender import ClonableSubscribableSender, SenderClosedError, SenderError
2020

2121
_logger = logging.getLogger(__name__)
2222

@@ -269,7 +269,7 @@ async def close(self) -> None: # noqa: D402
269269
"""Close the channel, deprecated alias for `aclose()`.""" # noqa: D402
270270
return await self.aclose()
271271

272-
def new_sender(self) -> Sender[ChannelMessageT]:
272+
def new_sender(self) -> ClonableSubscribableSender[ChannelMessageT]:
273273
"""Return a new sender attached to this channel."""
274274
return _Sender(self)
275275

@@ -317,7 +317,7 @@ def __repr__(self) -> str:
317317
_T = TypeVar("_T")
318318

319319

320-
class _Sender(Sender[_T]):
320+
class _Sender(ClonableSubscribableSender[_T]):
321321
"""A sender to send messages to the broadcast channel.
322322
323323
Should not be created directly, but through the
@@ -381,6 +381,23 @@ async def aclose(self) -> None:
381381
"""
382382
self._closed = True
383383

384+
@override
385+
def clone(self) -> _Sender[_T]:
386+
"""Return a clone of this sender."""
387+
return _Sender(self._channel)
388+
389+
@override
390+
def subscribe(
391+
self,
392+
name: str | None = None,
393+
limit: int = 50,
394+
warn_on_overflow: bool = True,
395+
) -> Receiver[_T]:
396+
"""Return a new receiver attached to this sender's channel."""
397+
return self._channel.new_receiver(
398+
name=name, limit=limit, warn_on_overflow=warn_on_overflow
399+
)
400+
384401
def __str__(self) -> str:
385402
"""Return a string representation of this sender."""
386403
return f"{self._channel}:{type(self).__name__}"

src/frequenz/channels/_sender.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,14 @@
4949
```
5050
"""
5151

52+
from __future__ import annotations
53+
5254
from abc import ABC, abstractmethod
5355
from typing import Generic
5456

5557
from ._exceptions import Error
5658
from ._generic import SenderMessageT_co, SenderMessageT_contra
59+
from ._receiver import Receiver
5760

5861

5962
class Sender(ABC, Generic[SenderMessageT_contra]):
@@ -109,3 +112,35 @@ def __init__(self, sender: Sender[SenderMessageT_co]):
109112
sender: The [Sender][frequenz.channels.Sender] that was closed.
110113
"""
111114
super().__init__("Sender is closed", sender)
115+
116+
117+
class SubscribableSender(Sender[SenderMessageT_contra], ABC):
118+
"""A [Sender][frequenz.channels.Sender] that can be subscribed to."""
119+
120+
@abstractmethod
121+
def subscribe(self) -> Receiver[SenderMessageT_contra]:
122+
"""Subscribe to this sender.
123+
124+
Returns:
125+
A new sender that sends messages to the same channel as this sender.
126+
"""
127+
128+
129+
class ClonableSender(Sender[SenderMessageT_contra], ABC):
130+
"""A [Sender][frequenz.channels.Sender] that can be cloned."""
131+
132+
@abstractmethod
133+
def clone(self) -> ClonableSender[SenderMessageT_contra]:
134+
"""Clone this sender.
135+
136+
Returns:
137+
A new sender that sends messages to the same channel as this sender.
138+
"""
139+
140+
141+
class ClonableSubscribableSender(
142+
SubscribableSender[SenderMessageT_contra],
143+
ClonableSender[SenderMessageT_contra],
144+
ABC,
145+
):
146+
"""A [Sender][frequenz.channels.Sender] that can be both cloned and subscribed to."""

0 commit comments

Comments
 (0)