@@ -207,6 +207,9 @@ def __init__(self, *, name: str, resend_latest: bool = False) -> None:
207207 self ._recv_cv : Condition = Condition ()
208208 """The condition to wait for data in the channel's buffer."""
209209
210+ self ._sender_count : int = 0
211+ """The number of senders attached to this channel."""
212+
210213 self ._receivers : dict [
211214 int , weakref .ReferenceType [_Receiver [ChannelMessageT ]]
212215 ] = {}
@@ -337,6 +340,13 @@ def __init__(self, channel: Broadcast[_T], /) -> None:
337340 self ._closed : bool = False
338341 """Whether this sender is closed."""
339342
343+ self ._channel ._sender_count += 1
344+
345+ @property
346+ def sender_count (self ) -> int :
347+ """Return the number of open senders attached to this sender's channel."""
348+ return self ._channel ._sender_count # pylint: disable=protected-access
349+
340350 @override
341351 async def send (self , message : _T , / ) -> None :
342352 """Send a message to all broadcast receivers.
@@ -379,7 +389,15 @@ async def aclose(self) -> None:
379389 attempt to send a message through a closed sender will raise a
380390 [SenderClosedError][frequenz.channels.SenderClosedError].
381391 """
392+ if self ._closed :
393+ return
382394 self ._closed = True
395+ self ._channel ._sender_count -= 1
396+
397+ def __del__ (self ) -> None :
398+ """Clean up this sender."""
399+ if not self ._closed :
400+ self ._channel ._sender_count -= 1
383401
384402 @override
385403 def clone (self ) -> _Sender [_T ]:
0 commit comments