Skip to content

Commit c61539a

Browse files
committed
Add BroadcastChannel and deprecate Broadcast class
The `BroadcastChannel` would return a sender and a receiver from an auto-closing channel. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
1 parent 3d721fd commit c61539a

2 files changed

Lines changed: 229 additions & 6 deletions

File tree

src/frequenz/channels/__init__.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@
8080
"""
8181

8282
from ._anycast import Anycast
83-
from ._broadcast import Broadcast, BroadcastReceiver, BroadcastSender
83+
from ._broadcast import Broadcast, BroadcastChannel, BroadcastReceiver, BroadcastSender
8484
from ._exceptions import ChannelClosedError, ChannelError, Error
8585
from ._generic import (
8686
ChannelMessageT,
@@ -101,16 +101,26 @@
101101
select,
102102
selected_from,
103103
)
104-
from ._sender import Sender, SenderClosedError, SenderError
104+
from ._sender import (
105+
ClonableSender,
106+
ClonableSubscribableSender,
107+
Sender,
108+
SenderClosedError,
109+
SenderError,
110+
SubscribableSender,
111+
)
105112

106113
__all__ = [
107114
"Anycast",
108115
"Broadcast",
116+
"BroadcastChannel",
109117
"BroadcastReceiver",
110118
"BroadcastSender",
111119
"ChannelClosedError",
112120
"ChannelError",
113121
"ChannelMessageT",
122+
"ClonableSender",
123+
"ClonableSubscribableSender",
114124
"Error",
115125
"ErroredChannelT_co",
116126
"LatestValueCache",
@@ -130,6 +140,7 @@
130140
"SenderError",
131141
"SenderMessageT_co",
132142
"SenderMessageT_contra",
143+
"SubscribableSender",
133144
"UnhandledSelectedError",
134145
"merge",
135146
"select",

src/frequenz/channels/_broadcast.py

Lines changed: 216 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
_logger = logging.getLogger(__name__)
2222

2323

24+
@deprecated("Please use BroadcastChannel instead.")
2425
class Broadcast( # pylint: disable=too-many-instance-attributes
2526
Generic[ChannelMessageT]
2627
):
@@ -337,8 +338,8 @@ class BroadcastSender(ClonableSubscribableSender[_T]):
337338
"""A sender to send messages to the broadcast channel.
338339
339340
Should not be created directly, but through the
340-
[Broadcast.new_sender()][frequenz.channels.Broadcast.new_sender]
341-
method.
341+
[BroadcastSender.clone()][frequenz.channels.BroadcastSender.clone]
342+
method of an existing sender.
342343
"""
343344

344345
def __init__(self, channel: Broadcast[_T], /) -> None:
@@ -452,8 +453,8 @@ class BroadcastReceiver(Receiver[_T]):
452453
"""A receiver to receive messages from the broadcast channel.
453454
454455
Should not be created directly, but through the
455-
[Broadcast.new_receiver()][frequenz.channels.Broadcast.new_receiver]
456-
method.
456+
[BroadcastSender.subscribe()][frequenz.channels.BroadcastSender.subscribe]
457+
method of an existing sender.
457458
"""
458459

459460
def __init__(
@@ -604,3 +605,214 @@ def __repr__(self) -> str:
604605
f"{type(self).__name__}(name={self._name!r}, limit={limit!r}, "
605606
f"{self._channel!r}):<id={id(self)!r}, used={len(self._q)!r}>"
606607
)
608+
609+
610+
class BroadcastChannel(
611+
tuple[BroadcastSender[ChannelMessageT], BroadcastReceiver[ChannelMessageT]]
612+
):
613+
"""A channel that deliver all messages to all receivers.
614+
615+
# Description
616+
617+
[BroadcastChannel][frequenz.channels.BroadcastChannel]s can have multiple
618+
[senders][frequenz.channels.BroadcastSender] and multiple
619+
[receivers][frequenz.channels.BroadcastReceiver]. Each message sent through
620+
any of the senders will be received by all receivers.
621+
622+
<center>
623+
```bob
624+
.---------. msg1 msg1,msg2 .-----------.
625+
| Sender +------. .---------->| Receiver |
626+
'---------' | .----------. | '-----------'
627+
+----->| Channel +-----+
628+
.---------. | '----------' | .-----------.
629+
| Sender +------' '----------->| Receiver |
630+
'---------' msg2 msg1,msg2 '-----------'
631+
```
632+
</center>
633+
634+
!!! Note inline end "Characteristics"
635+
636+
* **Buffered:** Yes, with one buffer per receiver
637+
* **Buffer full policy:** Drop oldest message
638+
* **Multiple receivers:** Yes
639+
* **Multiple senders:** Yes
640+
* **Thread-safe:** No
641+
642+
This channel is buffered, and when messages are not being consumed fast
643+
enough and the buffer fills up, old messages will get dropped.
644+
645+
Each receiver has its own buffer, so messages will only be dropped for
646+
receivers that can't keep up with the senders, and not for the whole
647+
channel.
648+
649+
Instantiating this class will create a new broadcast channel, and return an
650+
initial sender and a receiver. Further senders and receivers can be created
651+
with the [BroadcastSender.clone()][frequenz.channels.BroadcastSender.clone],
652+
and
653+
[BroadcastSender.subscribe()][frequenz.channels.BroadcastSender.subscribe]
654+
methods respectively.
655+
656+
When a sender or a receiver is not needed anymore, it should be closed with
657+
[`aclose()`][frequenz.channels.BroadcastSender.aclose] or
658+
[`close()`][frequenz.channels.BroadcastReceiver.close]. This will prevent
659+
further attempts to [`send()`][frequenz.channels.BroadcastSender.send] data,
660+
and will allow receivers to drain the pending items on their queues, but
661+
after that, subsequent [receive()][frequenz.channels.Receiver.receive] calls
662+
will raise a
663+
[`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].
664+
665+
When all senders of a channel are closed, all its receivers will be
666+
automatically closed, and vice versa.
667+
668+
This channel is useful, for example, to implement a pub/sub pattern, where
669+
multiple consumers can subscribe to a channel to receive all messages.
670+
671+
# Examples
672+
673+
Example: Send a few numbers to a receiver
674+
This is a very simple example that sends a few numbers from a single sender to
675+
a single receiver.
676+
677+
```python
678+
import asyncio
679+
680+
from frequenz.channels import BroadcastChannel, Sender
681+
682+
683+
async def send(sender: Sender[int]) -> None:
684+
for message in range(3):
685+
print(f"sending {message}")
686+
await sender.send(message)
687+
await sender.aclose()
688+
689+
690+
async def main() -> None:
691+
sender, receiver = BroadcastChannel[int](name="numbers")
692+
693+
async with asyncio.TaskGroup() as task_group:
694+
task_group.create_task(send(sender))
695+
for _ in range(3):
696+
message = await receiver.receive()
697+
print(f"received {message}")
698+
await asyncio.sleep(0.1) # sleep (or work) with the data
699+
700+
701+
asyncio.run(main())
702+
```
703+
704+
The output should look something like (although the sending and received might
705+
appear more interleaved):
706+
707+
```
708+
sending 0
709+
sending 1
710+
sending 2
711+
received 0
712+
received 1
713+
received 2
714+
```
715+
716+
Example: Send a few number from multiple senders to multiple receivers
717+
This is a more complex example that sends a few numbers from multiple senders to
718+
multiple receivers, using a small buffer to force the senders to block.
719+
720+
```python
721+
import asyncio
722+
723+
from frequenz.channels import BroadcastChannel, Receiver, ReceiverStoppedError, Sender
724+
725+
726+
async def send(name: str, sender: Sender[int], start: int, stop: int) -> None:
727+
for message in range(start, stop):
728+
print(f"{name} sending {message}")
729+
await sender.send(message)
730+
await sender.aclose()
731+
732+
733+
async def recv(name: str, receiver: Receiver[int]) -> None:
734+
try:
735+
async for message in receiver:
736+
print(f"{name} received {message}")
737+
await asyncio.sleep(0.1) # sleep (or work) with the data
738+
except ReceiverStoppedError:
739+
pass
740+
741+
742+
async def main() -> None:
743+
sender_1, receiver_1 = BroadcastChannel[int](name="numbers")
744+
sender_2 = sender_1.clone()
745+
receiver_2 = sender_1.subscribe()
746+
async with asyncio.TaskGroup() as task_group:
747+
task_group.create_task(send("sender_1", sender_1, 10, 13))
748+
task_group.create_task(send("sender_2", sender_2, 20, 22))
749+
task_group.create_task(recv("receiver_1", receiver_1))
750+
task_group.create_task(recv("receiver_2", receiver_2))
751+
752+
753+
asyncio.run(main())
754+
```
755+
756+
The output should look something like this(although the sending and received
757+
might appear interleaved in a different way):
758+
759+
```
760+
sender_1 sending 10
761+
sender_1 sending 11
762+
sender_1 sending 12
763+
sender_2 sending 20
764+
sender_2 sending 21
765+
receiver_1 received 10
766+
receiver_1 received 11
767+
receiver_1 received 12
768+
receiver_1 received 20
769+
receiver_1 received 21
770+
receiver_2 received 10
771+
receiver_2 received 11
772+
receiver_2 received 12
773+
receiver_2 received 20
774+
receiver_2 received 21
775+
```
776+
"""
777+
778+
def __new__(
779+
cls,
780+
name: str,
781+
resend_latest: bool = False,
782+
limit: int = 50,
783+
warn_on_overflow: bool = True,
784+
) -> BroadcastChannel[ChannelMessageT]:
785+
"""Create a new BroadcastChannel instance.
786+
787+
Args:
788+
name: The name of the channel. This is for logging purposes, and it will be
789+
shown in the string representation of the channel.
790+
resend_latest: When True, every time a new receiver is created with
791+
`new_receiver`, the last message seen by the channel will be sent to the
792+
new receiver automatically. This allows new receivers on slow streams to
793+
get the latest message as soon as they are created, without having to
794+
wait for the next message on the channel to arrive. It is safe to be
795+
set in data/reporting channels, but is not recommended for use in
796+
channels that stream control instructions.
797+
limit: Number of messages the receivers can hold in their buffers.
798+
warn_on_overflow: Whether to log a warning when a receiver's buffer is full
799+
and a message is dropped.
800+
801+
Returns:
802+
A new BroadcastChannel instance that can be destructured into an initial
803+
sender and receiver.
804+
"""
805+
channel = Broadcast[ChannelMessageT](
806+
name=name, resend_latest=resend_latest, auto_close=True
807+
)
808+
return tuple.__new__(
809+
cls,
810+
(
811+
channel.new_sender(),
812+
channel.new_receiver(
813+
name=f"{name}_receiver",
814+
limit=limit,
815+
warn_on_overflow=warn_on_overflow,
816+
),
817+
),
818+
)

0 commit comments

Comments
 (0)