Skip to content

Commit ddfab00

Browse files
committed
Add BroadcastChannel and deprecate Broadcast class
The `BroadcastChannel` would return a sender and a receiver from an auto-closing channel. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
1 parent 389574c commit ddfab00

3 files changed

Lines changed: 230 additions & 7 deletions

File tree

src/frequenz/channels/__init__.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@
8080
"""
8181

8282
from ._anycast import Anycast
83-
from ._broadcast import Broadcast, BroadcastReceiver, BroadcastSender
83+
from ._broadcast import Broadcast, BroadcastChannel, BroadcastReceiver, BroadcastSender
8484
from ._exceptions import ChannelClosedError, ChannelError, Error
8585
from ._generic import (
8686
ChannelMessageT,
@@ -101,16 +101,26 @@
101101
select,
102102
selected_from,
103103
)
104-
from ._sender import Sender, SenderClosedError, SenderError
104+
from ._sender import (
105+
ClonableSender,
106+
ClonableSubscribableSender,
107+
Sender,
108+
SenderClosedError,
109+
SenderError,
110+
SubscribableSender,
111+
)
105112

106113
__all__ = [
107114
"Anycast",
108115
"Broadcast",
116+
"BroadcastChannel",
109117
"BroadcastReceiver",
110118
"BroadcastSender",
111119
"ChannelClosedError",
112120
"ChannelError",
113121
"ChannelMessageT",
122+
"ClonableSender",
123+
"ClonableSubscribableSender",
114124
"Error",
115125
"ErroredChannelT_co",
116126
"LatestValueCache",
@@ -130,6 +140,7 @@
130140
"SenderError",
131141
"SenderMessageT_co",
132142
"SenderMessageT_contra",
143+
"SubscribableSender",
133144
"UnhandledSelectedError",
134145
"merge",
135146
"select",

src/frequenz/channels/_broadcast.py

Lines changed: 216 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
_logger = logging.getLogger(__name__)
2222

2323

24+
@deprecated("Please use BroadcastChannel instead.")
2425
class Broadcast( # pylint: disable=too-many-instance-attributes
2526
Generic[ChannelMessageT]
2627
):
@@ -337,8 +338,8 @@ class BroadcastSender(ClonableSubscribableSender[_T]):
337338
"""A sender to send messages to the broadcast channel.
338339
339340
Should not be created directly, but through the
340-
[Broadcast.new_sender()][frequenz.channels.Broadcast.new_sender]
341-
method.
341+
[BroadcastSender.clone()][frequenz.channels.BroadcastSender.clone]
342+
method of an existing sender.
342343
"""
343344

344345
def __init__(self, channel: Broadcast[_T], /) -> None:
@@ -467,8 +468,8 @@ class BroadcastReceiver(Receiver[_T]):
467468
"""A receiver to receive messages from the broadcast channel.
468469
469470
Should not be created directly, but through the
470-
[Broadcast.new_receiver()][frequenz.channels.Broadcast.new_receiver]
471-
method.
471+
[BroadcastSender.subscribe()][frequenz.channels.BroadcastSender.subscribe]
472+
method of an existing sender.
472473
"""
473474

474475
def __init__(
@@ -617,3 +618,214 @@ def __repr__(self) -> str:
617618
f"{type(self).__name__}(name={self._name!r}, limit={limit!r}, "
618619
f"{self._channel!r}):<id={id(self)!r}, used={len(self._q)!r}>"
619620
)
621+
622+
623+
class BroadcastChannel(
624+
tuple[BroadcastSender[ChannelMessageT], BroadcastReceiver[ChannelMessageT]]
625+
):
626+
"""A channel that deliver all messages to all receivers.
627+
628+
# Description
629+
630+
[BroadcastChannel][frequenz.channels.BroadcastChannel]s can have multiple
631+
[senders][frequenz.channels.BroadcastSender] and multiple
632+
[receivers][frequenz.channels.BroadcastReceiver]. Each message sent through
633+
any of the senders will be received by all receivers.
634+
635+
<center>
636+
```bob
637+
.---------. msg1 msg1,msg2 .-----------.
638+
| Sender +------. .---------->| Receiver |
639+
'---------' | .----------. | '-----------'
640+
+----->| Channel +-----+
641+
.---------. | '----------' | .-----------.
642+
| Sender +------' '----------->| Receiver |
643+
'---------' msg2 msg1,msg2 '-----------'
644+
```
645+
</center>
646+
647+
!!! Note inline end "Characteristics"
648+
649+
* **Buffered:** Yes, with one buffer per receiver
650+
* **Buffer full policy:** Drop oldest message
651+
* **Multiple receivers:** Yes
652+
* **Multiple senders:** Yes
653+
* **Thread-safe:** No
654+
655+
This channel is buffered, and when messages are not being consumed fast
656+
enough and the buffer fills up, old messages will get dropped.
657+
658+
Each receiver has its own buffer, so messages will only be dropped for
659+
receivers that can't keep up with the senders, and not for the whole
660+
channel.
661+
662+
Instantiating this class will create a new broadcast channel, and return an
663+
initial sender and a receiver. Further senders and receivers can be created
664+
with the [BroadcastSender.clone()][frequenz.channels.BroadcastSender.clone],
665+
and
666+
[BroadcastSender.subscribe()][frequenz.channels.BroadcastSender.subscribe]
667+
methods respectively.
668+
669+
When a sender or a receiver is not needed anymore, it should be closed with
670+
[`aclose()`][frequenz.channels.BroadcastSender.aclose] or
671+
[`close()`][frequenz.channels.BroadcastReceiver.close]. This will prevent
672+
further attempts to [`send()`][frequenz.channels.BroadcastSender.send] data,
673+
and will allow receivers to drain the pending items on their queues, but
674+
after that, subsequent [receive()][frequenz.channels.Receiver.receive] calls
675+
will raise a
676+
[`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].
677+
678+
When all senders of a channel are closed, all its receivers will be
679+
automatically closed, and vice versa.
680+
681+
This channel is useful, for example, to implement a pub/sub pattern, where
682+
multiple consumers can subscribe to a channel to receive all messages.
683+
684+
# Examples
685+
686+
Example: Send a few numbers to a receiver
687+
This is a very simple example that sends a few numbers from a single sender to
688+
a single receiver.
689+
690+
```python
691+
import asyncio
692+
693+
from frequenz.channels import BroadcastChannel, Sender
694+
695+
696+
async def send(sender: Sender[int]) -> None:
697+
for message in range(3):
698+
print(f"sending {message}")
699+
await sender.send(message)
700+
await sender.aclose()
701+
702+
703+
async def main() -> None:
704+
sender, receiver = BroadcastChannel[int](name="numbers")
705+
706+
async with asyncio.TaskGroup() as task_group:
707+
task_group.create_task(send(sender))
708+
for _ in range(3):
709+
message = await receiver.receive()
710+
print(f"received {message}")
711+
await asyncio.sleep(0.1) # sleep (or work) with the data
712+
713+
714+
asyncio.run(main())
715+
```
716+
717+
The output should look something like (although the sending and received might
718+
appear more interleaved):
719+
720+
```
721+
sending 0
722+
sending 1
723+
sending 2
724+
received 0
725+
received 1
726+
received 2
727+
```
728+
729+
Example: Send a few number from multiple senders to multiple receivers
730+
This is a more complex example that sends a few numbers from multiple senders to
731+
multiple receivers, using a small buffer to force the senders to block.
732+
733+
```python
734+
import asyncio
735+
736+
from frequenz.channels import BroadcastChannel, Receiver, ReceiverStoppedError, Sender
737+
738+
739+
async def send(name: str, sender: Sender[int], start: int, stop: int) -> None:
740+
for message in range(start, stop):
741+
print(f"{name} sending {message}")
742+
await sender.send(message)
743+
await sender.aclose()
744+
745+
746+
async def recv(name: str, receiver: Receiver[int]) -> None:
747+
try:
748+
async for message in receiver:
749+
print(f"{name} received {message}")
750+
await asyncio.sleep(0.1) # sleep (or work) with the data
751+
except ReceiverStoppedError:
752+
pass
753+
754+
755+
async def main() -> None:
756+
sender_1, receiver_1 = BroadcastChannel[int](name="numbers")
757+
sender_2 = sender_1.clone()
758+
receiver_2 = sender_1.subscribe()
759+
async with asyncio.TaskGroup() as task_group:
760+
task_group.create_task(send("sender_1", sender_1, 10, 13))
761+
task_group.create_task(send("sender_2", sender_2, 20, 22))
762+
task_group.create_task(recv("receiver_1", receiver_1))
763+
task_group.create_task(recv("receiver_2", receiver_2))
764+
765+
766+
asyncio.run(main())
767+
```
768+
769+
The output should look something like this(although the sending and received
770+
might appear interleaved in a different way):
771+
772+
```
773+
sender_1 sending 10
774+
sender_1 sending 11
775+
sender_1 sending 12
776+
sender_2 sending 20
777+
sender_2 sending 21
778+
receiver_1 received 10
779+
receiver_1 received 11
780+
receiver_1 received 12
781+
receiver_1 received 20
782+
receiver_1 received 21
783+
receiver_2 received 10
784+
receiver_2 received 11
785+
receiver_2 received 12
786+
receiver_2 received 20
787+
receiver_2 received 21
788+
```
789+
"""
790+
791+
def __new__(
792+
cls,
793+
name: str,
794+
resend_latest: bool = False,
795+
limit: int = 50,
796+
warn_on_overflow: bool = True,
797+
) -> BroadcastChannel[ChannelMessageT]:
798+
"""Create a new BroadcastChannel instance.
799+
800+
Args:
801+
name: The name of the channel. This is for logging purposes, and it will be
802+
shown in the string representation of the channel.
803+
resend_latest: When True, every time a new receiver is created with
804+
`new_receiver`, the last message seen by the channel will be sent to the
805+
new receiver automatically. This allows new receivers on slow streams to
806+
get the latest message as soon as they are created, without having to
807+
wait for the next message on the channel to arrive. It is safe to be
808+
set in data/reporting channels, but is not recommended for use in
809+
channels that stream control instructions.
810+
limit: Number of messages the receivers can hold in their buffers.
811+
warn_on_overflow: Whether to log a warning when a receiver's buffer is full
812+
and a message is dropped.
813+
814+
Returns:
815+
A new BroadcastChannel instance that can be destructured into an initial
816+
sender and receiver.
817+
"""
818+
channel = Broadcast[ChannelMessageT](
819+
name=name, resend_latest=resend_latest, auto_close=True
820+
)
821+
return tuple.__new__(
822+
cls,
823+
(
824+
channel.new_sender(),
825+
channel.new_receiver(
826+
name=f"{name}_receiver",
827+
limit=limit,
828+
warn_on_overflow=warn_on_overflow,
829+
),
830+
),
831+
)

src/frequenz/channels/_sender.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ def subscribe(self) -> Receiver[SenderMessageT_contra]:
122122
"""Subscribe to this sender.
123123
124124
Returns:
125-
A new sender that sends messages to the same channel as this sender.
125+
A new receiver attached to this sender's channel.
126126
"""
127127

128128

0 commit comments

Comments
 (0)