Skip to content

Commit 56c5f5f

Browse files
committed
Rename type
1 parent 2f674bb commit 56c5f5f

3 files changed

Lines changed: 20 additions & 20 deletions

File tree

livekit-rtc/livekit/rtc/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@
111111
from .data_track import (
112112
LocalDataTrack,
113113
RemoteDataTrack,
114-
DataTrackSubscription,
114+
DataTrackStream,
115115
DataTrackFrame,
116116
DataTrackInfo,
117117
PushFrameError,
@@ -197,7 +197,7 @@
197197
"FrameProcessor",
198198
"LocalDataTrack",
199199
"RemoteDataTrack",
200-
"DataTrackSubscription",
200+
"DataTrackStream",
201201
"DataTrackFrame",
202202
"DataTrackInfo",
203203
"PushFrameError",

livekit-rtc/livekit/rtc/data_track.py

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -182,15 +182,15 @@ def publisher_identity(self) -> str:
182182
"""Identity of the participant who published the track."""
183183
return self._publisher_identity
184184

185-
def subscribe(self, *, buffer_size: Optional[int] = None) -> DataTrackSubscription:
185+
def subscribe(self, *, buffer_size: Optional[int] = None) -> DataTrackStream:
186186
"""Subscribes to the data track to receive frames.
187187
188188
Args:
189189
buffer_size: Maximum number of received frames to buffer internally.
190190
When ``None``, the default buffer size is used.
191191
Zero is not a valid buffer size; if a value of zero is provided, it will be clamped to one.
192192
193-
Returns a :class:`DataTrackSubscription` that yields
193+
Returns a :class:`DataTrackStream` that yields
194194
:class:`DataTrackFrame` instances as they arrive. If the
195195
subscription encounters an error, it is raised as
196196
:class:`SubscribeDataTrackError` when iteration ends.
@@ -204,7 +204,7 @@ def subscribe(self, *, buffer_size: Optional[int] = None) -> DataTrackSubscripti
204204
req.subscribe_data_track.options.CopyFrom(opts)
205205

206206
resp = FfiClient.instance.request(req)
207-
return DataTrackSubscription(resp.subscribe_data_track.subscription)
207+
return DataTrackStream(resp.subscribe_data_track.stream)
208208

209209
def is_published(self) -> bool:
210210
"""Whether or not the track is still published."""
@@ -221,29 +221,29 @@ def __repr__(self) -> str:
221221
)
222222

223223

224-
class DataTrackSubscription:
224+
class DataTrackStream:
225225
"""An active subscription to a remote data track.
226226
227227
Use as an async iterator to receive frames::
228228
229-
subscription = remote_track.subscribe()
230-
async for frame in subscription:
229+
stream = remote_track.subscribe()
230+
async for frame in stream:
231231
process(frame.payload)
232232
233-
Dropping or closing the subscription unsubscribes from the track.
233+
Dropping or closing the stream unsubscribes from the track.
234234
235235
If subscribing to the track fails, :class:`SubscribeDataTrackError`
236236
is raised when iteration ends instead of a normal ``StopAsyncIteration``.
237237
"""
238238

239-
def __init__(self, owned_info: proto_data_track.OwnedDataTrackSubscription) -> None:
239+
def __init__(self, owned_info: proto_data_track.OwnedDataTrackStream) -> None:
240240
self._ffi_handle = FfiHandle(owned_info.handle.id)
241241
handle_id = owned_info.handle.id
242242

243243
self._queue = FfiClient.instance.queue.subscribe(
244244
filter_fn=lambda e: (
245-
e.WhichOneof("message") == "data_track_subscription_event"
246-
and e.data_track_subscription_event.subscription_handle == handle_id
245+
e.WhichOneof("message") == "data_track_stream_event"
246+
and e.data_track_stream_event.stream_handle == handle_id
247247
),
248248
)
249249
self._closed = False
@@ -257,11 +257,11 @@ async def __anext__(self) -> DataTrackFrame:
257257

258258
self._send_read_request()
259259
event: proto_ffi.FfiEvent = await self._queue.get()
260-
sub_event = event.data_track_subscription_event
261-
detail = sub_event.WhichOneof("detail")
260+
stream_event = event.data_track_stream_event
261+
detail = stream_event.WhichOneof("detail")
262262

263263
if detail == "frame_received":
264-
proto_frame = sub_event.frame_received.frame
264+
proto_frame = stream_event.frame_received.frame
265265
user_ts: Optional[int] = None
266266
if proto_frame.HasField("user_timestamp"):
267267
user_ts = proto_frame.user_timestamp
@@ -271,16 +271,16 @@ async def __anext__(self) -> DataTrackFrame:
271271
)
272272
elif detail == "eos":
273273
self._close()
274-
if sub_event.eos.HasField("error"):
275-
raise SubscribeDataTrackError(sub_event.eos.error)
274+
if stream_event.eos.HasField("error"):
275+
raise SubscribeDataTrackError(stream_event.eos.error)
276276
raise StopAsyncIteration
277277
else:
278278
self._close()
279279
raise StopAsyncIteration
280280

281281
def _send_read_request(self) -> None:
282282
req = proto_ffi.FfiRequest()
283-
req.data_track_subscription_read.subscription_handle = self._ffi_handle.handle
283+
req.data_track_stream_read.stream_handle = self._ffi_handle.handle
284284
FfiClient.instance.request(req)
285285

286286
def _close(self) -> None:

tests/rtc/test_e2e.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,7 @@ def on_data_track_unpublished(sid: str):
488488
assert remote_track.publisher_identity == PUBLISHER_IDENTITY
489489
assert remote_track.is_published()
490490

491-
subscription = remote_track.subscribe()
491+
stream = remote_track.subscribe()
492492

493493
async def push_frames():
494494
for i in range(FRAME_COUNT):
@@ -502,7 +502,7 @@ async def push_frames():
502502
async def publish_and_receive():
503503
push_task = asyncio.create_task(push_frames())
504504
recv_count = 0
505-
async for frame in subscription:
505+
async for frame in stream:
506506
first_byte = frame.payload[0]
507507
assert all(b == first_byte for b in frame.payload), "Payload bytes are not uniform"
508508
assert len(frame.payload) == PAYLOAD_SIZE

0 commit comments

Comments
 (0)