Skip to content

Commit 7e77cfd

Browse files
committed
Add BroadcastChannel and deprecate Broadcast class
The `BroadcastChannel` would return a sender and a receiver from an auto-closing channel. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
1 parent 3d721fd commit 7e77cfd

3 files changed

Lines changed: 241 additions & 8 deletions

File tree

src/frequenz/channels/__init__.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@
8080
"""
8181

8282
from ._anycast import Anycast
83-
from ._broadcast import Broadcast, BroadcastReceiver, BroadcastSender
83+
from ._broadcast import Broadcast, BroadcastChannel, BroadcastReceiver, BroadcastSender
8484
from ._exceptions import ChannelClosedError, ChannelError, Error
8585
from ._generic import (
8686
ChannelMessageT,
@@ -101,16 +101,26 @@
101101
select,
102102
selected_from,
103103
)
104-
from ._sender import Sender, SenderClosedError, SenderError
104+
from ._sender import (
105+
ClonableSender,
106+
ClonableSubscribableSender,
107+
Sender,
108+
SenderClosedError,
109+
SenderError,
110+
SubscribableSender,
111+
)
105112

106113
__all__ = [
107114
"Anycast",
108115
"Broadcast",
116+
"BroadcastChannel",
109117
"BroadcastReceiver",
110118
"BroadcastSender",
111119
"ChannelClosedError",
112120
"ChannelError",
113121
"ChannelMessageT",
122+
"ClonableSender",
123+
"ClonableSubscribableSender",
114124
"Error",
115125
"ErroredChannelT_co",
116126
"LatestValueCache",
@@ -130,6 +140,7 @@
130140
"SenderError",
131141
"SenderMessageT_co",
132142
"SenderMessageT_contra",
143+
"SubscribableSender",
133144
"UnhandledSelectedError",
134145
"merge",
135146
"select",

src/frequenz/channels/_broadcast.py

Lines changed: 227 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
_logger = logging.getLogger(__name__)
2222

2323

24+
@deprecated("Please use BroadcastChannel instead.")
2425
class Broadcast( # pylint: disable=too-many-instance-attributes
2526
Generic[ChannelMessageT]
2627
):
@@ -337,8 +338,8 @@ class BroadcastSender(ClonableSubscribableSender[_T]):
337338
"""A sender to send messages to the broadcast channel.
338339
339340
Should not be created directly, but through the
340-
[Broadcast.new_sender()][frequenz.channels.Broadcast.new_sender]
341-
method.
341+
[BroadcastSender.clone()][frequenz.channels.BroadcastSender.clone]
342+
method of an existing sender.
342343
"""
343344

344345
def __init__(self, channel: Broadcast[_T], /) -> None:
@@ -434,7 +435,17 @@ def subscribe(
434435
limit: int = 50,
435436
warn_on_overflow: bool = True,
436437
) -> BroadcastReceiver[_T]:
437-
"""Return a new receiver attached to this sender's channel."""
438+
"""Return a new receiver attached to this sender's channel.
439+
440+
Args:
441+
name: A name to identify the receiver in the logs.
442+
limit: Number of messages the receiver can hold in its buffer.
443+
warn_on_overflow: Whether to log a warning when the receiver's buffer is
444+
full and a message is dropped.
445+
446+
Returns:
447+
A new receiver attached to this sender's channel.
448+
"""
438449
return self._channel.new_receiver(
439450
name=name, limit=limit, warn_on_overflow=warn_on_overflow
440451
)
@@ -452,8 +463,8 @@ class BroadcastReceiver(Receiver[_T]):
452463
"""A receiver to receive messages from the broadcast channel.
453464
454465
Should not be created directly, but through the
455-
[Broadcast.new_receiver()][frequenz.channels.Broadcast.new_receiver]
456-
method.
466+
[BroadcastSender.subscribe()][frequenz.channels.BroadcastSender.subscribe]
467+
method of an existing sender.
457468
"""
458469

459470
def __init__(
@@ -604,3 +615,214 @@ def __repr__(self) -> str:
604615
f"{type(self).__name__}(name={self._name!r}, limit={limit!r}, "
605616
f"{self._channel!r}):<id={id(self)!r}, used={len(self._q)!r}>"
606617
)
618+
619+
620+
class BroadcastChannel(
621+
tuple[BroadcastSender[ChannelMessageT], BroadcastReceiver[ChannelMessageT]]
622+
):
623+
"""A channel that deliver all messages to all receivers.
624+
625+
# Description
626+
627+
[BroadcastChannel][frequenz.channels.BroadcastChannel]s can have multiple
628+
[senders][frequenz.channels.BroadcastSender] and multiple
629+
[receivers][frequenz.channels.BroadcastReceiver]. Each message sent through
630+
any of the senders will be received by all receivers.
631+
632+
<center>
633+
```bob
634+
.---------. msg1 msg1,msg2 .-----------.
635+
| Sender +------. .---------->| Receiver |
636+
'---------' | .----------. | '-----------'
637+
+----->| Channel +-----+
638+
.---------. | '----------' | .-----------.
639+
| Sender +------' '----------->| Receiver |
640+
'---------' msg2 msg1,msg2 '-----------'
641+
```
642+
</center>
643+
644+
!!! Note inline end "Characteristics"
645+
646+
* **Buffered:** Yes, with one buffer per receiver
647+
* **Buffer full policy:** Drop oldest message
648+
* **Multiple receivers:** Yes
649+
* **Multiple senders:** Yes
650+
* **Thread-safe:** No
651+
652+
This channel is buffered, and when messages are not being consumed fast
653+
enough and the buffer fills up, old messages will get dropped.
654+
655+
Each receiver has its own buffer, so messages will only be dropped for
656+
receivers that can't keep up with the senders, and not for the whole
657+
channel.
658+
659+
Instantiating this class will create a new broadcast channel, and return an
660+
initial sender and a receiver. Further senders and receivers can be created
661+
with the [BroadcastSender.clone()][frequenz.channels.BroadcastSender.clone],
662+
and
663+
[BroadcastSender.subscribe()][frequenz.channels.BroadcastSender.subscribe]
664+
methods respectively.
665+
666+
When a sender or a receiver is not needed anymore, it should be closed with
667+
[`aclose()`][frequenz.channels.BroadcastSender.aclose] or
668+
[`close()`][frequenz.channels.BroadcastReceiver.close]. This will prevent
669+
further attempts to [`send()`][frequenz.channels.BroadcastSender.send] data,
670+
and will allow receivers to drain the pending items on their queues, but
671+
after that, subsequent [receive()][frequenz.channels.Receiver.receive] calls
672+
will raise a
673+
[`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].
674+
675+
When all senders of a channel are closed, all its receivers will be
676+
automatically closed, and vice versa.
677+
678+
This channel is useful, for example, to implement a pub/sub pattern, where
679+
multiple consumers can subscribe to a channel to receive all messages.
680+
681+
# Examples
682+
683+
Example: Send a few numbers to a receiver
684+
This is a very simple example that sends a few numbers from a single sender to
685+
a single receiver.
686+
687+
```python
688+
import asyncio
689+
690+
from frequenz.channels import BroadcastChannel, Sender
691+
692+
693+
async def send(sender: Sender[int]) -> None:
694+
for message in range(3):
695+
print(f"sending {message}")
696+
await sender.send(message)
697+
await sender.aclose()
698+
699+
700+
async def main() -> None:
701+
sender, receiver = BroadcastChannel[int](name="numbers")
702+
703+
async with asyncio.TaskGroup() as task_group:
704+
task_group.create_task(send(sender))
705+
for _ in range(3):
706+
message = await receiver.receive()
707+
print(f"received {message}")
708+
await asyncio.sleep(0.1) # sleep (or work) with the data
709+
710+
711+
asyncio.run(main())
712+
```
713+
714+
The output should look something like (although the sending and received might
715+
appear more interleaved):
716+
717+
```
718+
sending 0
719+
sending 1
720+
sending 2
721+
received 0
722+
received 1
723+
received 2
724+
```
725+
726+
Example: Send a few number from multiple senders to multiple receivers
727+
This is a more complex example that sends a few numbers from multiple senders to
728+
multiple receivers, using a small buffer to force the senders to block.
729+
730+
```python
731+
import asyncio
732+
733+
from frequenz.channels import BroadcastChannel, Receiver, ReceiverStoppedError, Sender
734+
735+
736+
async def send(name: str, sender: Sender[int], start: int, stop: int) -> None:
737+
for message in range(start, stop):
738+
print(f"{name} sending {message}")
739+
await sender.send(message)
740+
await sender.aclose()
741+
742+
743+
async def recv(name: str, receiver: Receiver[int]) -> None:
744+
try:
745+
async for message in receiver:
746+
print(f"{name} received {message}")
747+
await asyncio.sleep(0.1) # sleep (or work) with the data
748+
except ReceiverStoppedError:
749+
pass
750+
751+
752+
async def main() -> None:
753+
sender_1, receiver_1 = BroadcastChannel[int](name="numbers")
754+
sender_2 = sender_1.clone()
755+
receiver_2 = sender_1.subscribe()
756+
async with asyncio.TaskGroup() as task_group:
757+
task_group.create_task(send("sender_1", sender_1, 10, 13))
758+
task_group.create_task(send("sender_2", sender_2, 20, 22))
759+
task_group.create_task(recv("receiver_1", receiver_1))
760+
task_group.create_task(recv("receiver_2", receiver_2))
761+
762+
763+
asyncio.run(main())
764+
```
765+
766+
The output should look something like this(although the sending and received
767+
might appear interleaved in a different way):
768+
769+
```
770+
sender_1 sending 10
771+
sender_1 sending 11
772+
sender_1 sending 12
773+
sender_2 sending 20
774+
sender_2 sending 21
775+
receiver_1 received 10
776+
receiver_1 received 11
777+
receiver_1 received 12
778+
receiver_1 received 20
779+
receiver_1 received 21
780+
receiver_2 received 10
781+
receiver_2 received 11
782+
receiver_2 received 12
783+
receiver_2 received 20
784+
receiver_2 received 21
785+
```
786+
"""
787+
788+
def __new__(
789+
cls,
790+
name: str,
791+
resend_latest: bool = False,
792+
limit: int = 50,
793+
warn_on_overflow: bool = True,
794+
) -> BroadcastChannel[ChannelMessageT]:
795+
"""Create a new BroadcastChannel instance.
796+
797+
Args:
798+
name: The name of the channel. This is for logging purposes, and it will be
799+
shown in the string representation of the channel.
800+
resend_latest: When True, every time a new receiver is created with
801+
`new_receiver`, the last message seen by the channel will be sent to the
802+
new receiver automatically. This allows new receivers on slow streams to
803+
get the latest message as soon as they are created, without having to
804+
wait for the next message on the channel to arrive. It is safe to be
805+
set in data/reporting channels, but is not recommended for use in
806+
channels that stream control instructions.
807+
limit: Number of messages the receivers can hold in their buffers.
808+
warn_on_overflow: Whether to log a warning when a receiver's buffer is full
809+
and a message is dropped.
810+
811+
Returns:
812+
A new BroadcastChannel instance that can be destructured into an initial
813+
sender and receiver.
814+
"""
815+
channel = Broadcast[ChannelMessageT](
816+
name=name, resend_latest=resend_latest, auto_close=True
817+
)
818+
return tuple.__new__(
819+
cls,
820+
(
821+
channel.new_sender(),
822+
channel.new_receiver(
823+
name=f"{name}_receiver",
824+
limit=limit,
825+
warn_on_overflow=warn_on_overflow,
826+
),
827+
),
828+
)

src/frequenz/channels/_sender.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ def subscribe(self) -> Receiver[SenderMessageT_contra]:
122122
"""Subscribe to this sender.
123123
124124
Returns:
125-
A new sender that sends messages to the same channel as this sender.
125+
A new receiver attached to this sender's channel.
126126
"""
127127

128128

0 commit comments

Comments
 (0)