Skip to content

Commit 428eec0

Browse files
committed
Expose strongly-typed Senders and Receivers from Broadcast
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
1 parent 37bd825 commit 428eec0

2 files changed

Lines changed: 13 additions & 13 deletions

File tree

src/frequenz/channels/_broadcast.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ def __init__(self, *, name: str, resend_latest: bool = False) -> None:
211211
"""The number of senders attached to this channel."""
212212

213213
self._receivers: dict[
214-
int, weakref.ReferenceType[_Receiver[ChannelMessageT]]
214+
int, weakref.ReferenceType[BroadcastReceiver[ChannelMessageT]]
215215
] = {}
216216
"""The receivers attached to the channel, indexed by their hash()."""
217217

@@ -272,13 +272,13 @@ async def close(self) -> None: # noqa: D402
272272
"""Close the channel, deprecated alias for `aclose()`.""" # noqa: D402
273273
return await self.aclose()
274274

275-
def new_sender(self) -> ClonableSubscribableSender[ChannelMessageT]:
275+
def new_sender(self) -> BroadcastSender[ChannelMessageT]:
276276
"""Return a new sender attached to this channel."""
277-
return _Sender(self)
277+
return BroadcastSender(self)
278278

279279
def new_receiver(
280280
self, *, name: str | None = None, limit: int = 50, warn_on_overflow: bool = True
281-
) -> Receiver[ChannelMessageT]:
281+
) -> BroadcastReceiver[ChannelMessageT]:
282282
"""Return a new receiver attached to this channel.
283283
284284
Broadcast receivers have their own buffer, and when messages are not
@@ -294,7 +294,7 @@ def new_receiver(
294294
Returns:
295295
A new receiver attached to this channel.
296296
"""
297-
recv: _Receiver[ChannelMessageT] = _Receiver(
297+
recv: BroadcastReceiver[ChannelMessageT] = BroadcastReceiver(
298298
self, name=name, limit=limit, warn_on_overflow=warn_on_overflow
299299
)
300300
self._receivers[hash(recv)] = weakref.ref(recv)
@@ -320,7 +320,7 @@ def __repr__(self) -> str:
320320
_T = TypeVar("_T")
321321

322322

323-
class _Sender(ClonableSubscribableSender[_T]):
323+
class BroadcastSender(ClonableSubscribableSender[_T]):
324324
"""A sender to send messages to the broadcast channel.
325325
326326
Should not be created directly, but through the
@@ -400,17 +400,17 @@ def __del__(self) -> None:
400400
self._channel._sender_count -= 1
401401

402402
@override
403-
def clone(self) -> _Sender[_T]:
403+
def clone(self) -> BroadcastSender[_T]:
404404
"""Return a clone of this sender."""
405-
return _Sender(self._channel)
405+
return BroadcastSender(self._channel)
406406

407407
@override
408408
def subscribe(
409409
self,
410410
name: str | None = None,
411411
limit: int = 50,
412412
warn_on_overflow: bool = True,
413-
) -> Receiver[_T]:
413+
) -> BroadcastReceiver[_T]:
414414
"""Return a new receiver attached to this sender's channel."""
415415
return self._channel.new_receiver(
416416
name=name, limit=limit, warn_on_overflow=warn_on_overflow
@@ -425,7 +425,7 @@ def __repr__(self) -> str:
425425
return f"{type(self).__name__}({self._channel!r})"
426426

427427

428-
class _Receiver(Receiver[_T]):
428+
class BroadcastReceiver(Receiver[_T]):
429429
"""A receiver to receive messages from the broadcast channel.
430430
431431
Should not be created directly, but through the

tests/test_broadcast.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ async def test_broadcast_after_close() -> None:
107107
async def test_broadcast_overflow() -> None:
108108
"""Ensure messages sent to full broadcast receivers get dropped."""
109109
from frequenz.channels._broadcast import ( # pylint: disable=import-outside-toplevel
110-
_Receiver,
110+
BroadcastReceiver,
111111
)
112112

113113
bcast: Broadcast[int] = Broadcast(name="meter_5")
@@ -117,9 +117,9 @@ async def test_broadcast_overflow() -> None:
117117
sender = bcast.new_sender()
118118

119119
big_receiver = bcast.new_receiver(name="named-recv", limit=big_recv_size)
120-
assert isinstance(big_receiver, _Receiver)
120+
assert isinstance(big_receiver, BroadcastReceiver)
121121
small_receiver = bcast.new_receiver(limit=small_recv_size)
122-
assert isinstance(small_receiver, _Receiver)
122+
assert isinstance(small_receiver, BroadcastReceiver)
123123

124124
async def drain_receivers() -> tuple[int, int]:
125125
big_sum = 0

0 commit comments

Comments
 (0)