Skip to content

Commit 2620374

Browse files
committed
Add BroadcastChannel and deprecate Broadcast class
The `BroadcastChannel` would return a sender and a receiver from an auto-closing channel. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
1 parent 3febbe0 commit 2620374

3 files changed

Lines changed: 241 additions & 8 deletions

File tree

src/frequenz/channels/__init__.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@
8080
"""
8181

8282
from ._anycast import Anycast
83-
from ._broadcast import Broadcast, BroadcastReceiver, BroadcastSender
83+
from ._broadcast import Broadcast, BroadcastChannel, BroadcastReceiver, BroadcastSender
8484
from ._exceptions import ChannelClosedError, ChannelError, Error
8585
from ._generic import (
8686
ChannelMessageT,
@@ -101,16 +101,26 @@
101101
select,
102102
selected_from,
103103
)
104-
from ._sender import Sender, SenderClosedError, SenderError
104+
from ._sender import (
105+
ClonableSender,
106+
ClonableSubscribableSender,
107+
Sender,
108+
SenderClosedError,
109+
SenderError,
110+
SubscribableSender,
111+
)
105112

106113
__all__ = [
107114
"Anycast",
108115
"Broadcast",
116+
"BroadcastChannel",
109117
"BroadcastReceiver",
110118
"BroadcastSender",
111119
"ChannelClosedError",
112120
"ChannelError",
113121
"ChannelMessageT",
122+
"ClonableSender",
123+
"ClonableSubscribableSender",
114124
"Error",
115125
"ErroredChannelT_co",
116126
"LatestValueCache",
@@ -130,6 +140,7 @@
130140
"SenderError",
131141
"SenderMessageT_co",
132142
"SenderMessageT_contra",
143+
"SubscribableSender",
133144
"UnhandledSelectedError",
134145
"merge",
135146
"select",

src/frequenz/channels/_broadcast.py

Lines changed: 227 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
_logger = logging.getLogger(__name__)
2222

2323

24+
@deprecated("Please use BroadcastChannel instead.")
2425
class Broadcast( # pylint: disable=too-many-instance-attributes
2526
Generic[ChannelMessageT]
2627
):
@@ -337,8 +338,8 @@ class BroadcastSender(ClonableSubscribableSender[_T]):
337338
"""A sender to send messages to the broadcast channel.
338339
339340
Should not be created directly, but through the
340-
[Broadcast.new_sender()][frequenz.channels.Broadcast.new_sender]
341-
method.
341+
[BroadcastSender.clone()][frequenz.channels.BroadcastSender.clone]
342+
method of an existing sender.
342343
"""
343344

344345
def __init__(self, channel: Broadcast[_T], /) -> None:
@@ -434,7 +435,17 @@ def subscribe(
434435
limit: int = 50,
435436
warn_on_overflow: bool = True,
436437
) -> BroadcastReceiver[_T]:
437-
"""Return a new receiver attached to this sender's channel."""
438+
"""Return a new receiver attached to this sender's channel.
439+
440+
Args:
441+
name: A name to identify the receiver in the logs.
442+
limit: Number of messages the receiver can hold in its buffer.
443+
warn_on_overflow: Whether to log a warning when the receiver's buffer is
444+
full and a message is dropped.
445+
446+
Returns:
447+
A new receiver attached to this sender's channel.
448+
"""
438449
return self._channel.new_receiver(
439450
name=name, limit=limit, warn_on_overflow=warn_on_overflow
440451
)
@@ -452,8 +463,8 @@ class BroadcastReceiver(Receiver[_T]):
452463
"""A receiver to receive messages from the broadcast channel.
453464
454465
Should not be created directly, but through the
455-
[Broadcast.new_receiver()][frequenz.channels.Broadcast.new_receiver]
456-
method.
466+
[BroadcastSender.subscribe()][frequenz.channels.BroadcastSender.subscribe]
467+
method of an existing sender.
457468
"""
458469

459470
def __init__(
@@ -602,3 +613,214 @@ def __repr__(self) -> str:
602613
f"{type(self).__name__}(name={self._name!r}, limit={limit!r}, "
603614
f"{self._channel!r}):<id={id(self)!r}, used={len(self._q)!r}>"
604615
)
616+
617+
618+
class BroadcastChannel(
619+
tuple[BroadcastSender[ChannelMessageT], BroadcastReceiver[ChannelMessageT]]
620+
):
621+
"""A channel that deliver all messages to all receivers.
622+
623+
# Description
624+
625+
[BroadcastChannel][frequenz.channels.BroadcastChannel]s can have multiple
626+
[senders][frequenz.channels.BroadcastSender] and multiple
627+
[receivers][frequenz.channels.BroadcastReceiver]. Each message sent through
628+
any of the senders will be received by all receivers.
629+
630+
<center>
631+
```bob
632+
.---------. msg1 msg1,msg2 .-----------.
633+
| Sender +------. .---------->| Receiver |
634+
'---------' | .----------. | '-----------'
635+
+----->| Channel +-----+
636+
.---------. | '----------' | .-----------.
637+
| Sender +------' '----------->| Receiver |
638+
'---------' msg2 msg1,msg2 '-----------'
639+
```
640+
</center>
641+
642+
!!! Note inline end "Characteristics"
643+
644+
* **Buffered:** Yes, with one buffer per receiver
645+
* **Buffer full policy:** Drop oldest message
646+
* **Multiple receivers:** Yes
647+
* **Multiple senders:** Yes
648+
* **Thread-safe:** No
649+
650+
This channel is buffered, and when messages are not being consumed fast
651+
enough and the buffer fills up, old messages will get dropped.
652+
653+
Each receiver has its own buffer, so messages will only be dropped for
654+
receivers that can't keep up with the senders, and not for the whole
655+
channel.
656+
657+
Instantiating this class will create a new broadcast channel, and return an
658+
initial sender and a receiver. Further senders and receivers can be created
659+
with the [BroadcastSender.clone()][frequenz.channels.BroadcastSender.clone],
660+
and
661+
[BroadcastSender.subscribe()][frequenz.channels.BroadcastSender.subscribe]
662+
methods respectively.
663+
664+
When a sender or a receiver is not needed anymore, it should be closed with
665+
[`aclose()`][frequenz.channels.BroadcastSender.aclose] or
666+
[`close()`][frequenz.channels.BroadcastReceiver.close]. This will prevent
667+
further attempts to [`send()`][frequenz.channels.BroadcastSender.send] data,
668+
and will allow receivers to drain the pending items on their queues, but
669+
after that, subsequent [receive()][frequenz.channels.Receiver.receive] calls
670+
will raise a
671+
[`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].
672+
673+
When all senders of a channel are closed, all its receivers will be
674+
automatically closed, and vice versa.
675+
676+
This channel is useful, for example, to implement a pub/sub pattern, where
677+
multiple consumers can subscribe to a channel to receive all messages.
678+
679+
# Examples
680+
681+
Example: Send a few numbers to a receiver
682+
This is a very simple example that sends a few numbers from a single sender to
683+
a single receiver.
684+
685+
```python
686+
import asyncio
687+
688+
from frequenz.channels import BroadcastChannel, Sender
689+
690+
691+
async def send(sender: Sender[int]) -> None:
692+
for message in range(3):
693+
print(f"sending {message}")
694+
await sender.send(message)
695+
await sender.aclose()
696+
697+
698+
async def main() -> None:
699+
sender, receiver = BroadcastChannel[int](name="numbers")
700+
701+
async with asyncio.TaskGroup() as task_group:
702+
task_group.create_task(send(sender))
703+
for _ in range(3):
704+
message = await receiver.receive()
705+
print(f"received {message}")
706+
await asyncio.sleep(0.1) # sleep (or work) with the data
707+
708+
709+
asyncio.run(main())
710+
```
711+
712+
The output should look something like (although the sending and received might
713+
appear more interleaved):
714+
715+
```
716+
sending 0
717+
sending 1
718+
sending 2
719+
received 0
720+
received 1
721+
received 2
722+
```
723+
724+
Example: Send a few number from multiple senders to multiple receivers
725+
This is a more complex example that sends a few numbers from multiple senders to
726+
multiple receivers, using a small buffer to force the senders to block.
727+
728+
```python
729+
import asyncio
730+
731+
from frequenz.channels import BroadcastChannel, Receiver, ReceiverStoppedError, Sender
732+
733+
734+
async def send(name: str, sender: Sender[int], start: int, stop: int) -> None:
735+
for message in range(start, stop):
736+
print(f"{name} sending {message}")
737+
await sender.send(message)
738+
await sender.aclose()
739+
740+
741+
async def recv(name: str, receiver: Receiver[int]) -> None:
742+
try:
743+
async for message in receiver:
744+
print(f"{name} received {message}")
745+
await asyncio.sleep(0.1) # sleep (or work) with the data
746+
except ReceiverStoppedError:
747+
pass
748+
749+
750+
async def main() -> None:
751+
sender_1, receiver_1 = BroadcastChannel[int](name="numbers")
752+
sender_2 = sender_1.clone()
753+
receiver_2 = sender_1.subscribe()
754+
async with asyncio.TaskGroup() as task_group:
755+
task_group.create_task(send("sender_1", sender_1, 10, 13))
756+
task_group.create_task(send("sender_2", sender_2, 20, 22))
757+
task_group.create_task(recv("receiver_1", receiver_1))
758+
task_group.create_task(recv("receiver_2", receiver_2))
759+
760+
761+
asyncio.run(main())
762+
```
763+
764+
The output should look something like this(although the sending and received
765+
might appear interleaved in a different way):
766+
767+
```
768+
sender_1 sending 10
769+
sender_1 sending 11
770+
sender_1 sending 12
771+
sender_2 sending 20
772+
sender_2 sending 21
773+
receiver_1 received 10
774+
receiver_1 received 11
775+
receiver_1 received 12
776+
receiver_1 received 20
777+
receiver_1 received 21
778+
receiver_2 received 10
779+
receiver_2 received 11
780+
receiver_2 received 12
781+
receiver_2 received 20
782+
receiver_2 received 21
783+
```
784+
"""
785+
786+
def __new__(
787+
cls,
788+
name: str,
789+
resend_latest: bool = False,
790+
limit: int = 50,
791+
warn_on_overflow: bool = True,
792+
) -> BroadcastChannel[ChannelMessageT]:
793+
"""Create a new BroadcastChannel instance.
794+
795+
Args:
796+
name: The name of the channel. This is for logging purposes, and it will be
797+
shown in the string representation of the channel.
798+
resend_latest: When True, every time a new receiver is created with
799+
`new_receiver`, the last message seen by the channel will be sent to the
800+
new receiver automatically. This allows new receivers on slow streams to
801+
get the latest message as soon as they are created, without having to
802+
wait for the next message on the channel to arrive. It is safe to be
803+
set in data/reporting channels, but is not recommended for use in
804+
channels that stream control instructions.
805+
limit: Number of messages the receivers can hold in their buffers.
806+
warn_on_overflow: Whether to log a warning when a receiver's buffer is full
807+
and a message is dropped.
808+
809+
Returns:
810+
A new BroadcastChannel instance that can be destructured into an initial
811+
sender and receiver.
812+
"""
813+
channel = Broadcast[ChannelMessageT](
814+
name=name, resend_latest=resend_latest, auto_close=True
815+
)
816+
return tuple.__new__(
817+
cls,
818+
(
819+
channel.new_sender(),
820+
channel.new_receiver(
821+
name=f"{name}_receiver",
822+
limit=limit,
823+
warn_on_overflow=warn_on_overflow,
824+
),
825+
),
826+
)

src/frequenz/channels/_sender.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ def subscribe(self) -> Receiver[SenderMessageT_contra]:
122122
"""Subscribe to this sender.
123123
124124
Returns:
125-
A new sender that sends messages to the same channel as this sender.
125+
A new receiver attached to this sender's channel.
126126
"""
127127

128128

0 commit comments

Comments
 (0)