Skip to content

Commit 3bce4c2

Browse files
Utilize response from data_track_stream_read request
1 parent 84a60c1 commit 3bce4c2

1 file changed

Lines changed: 27 additions & 8 deletions

File tree

livekit-rtc/livekit/rtc/data_track.py

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from __future__ import annotations
1616

1717
from dataclasses import dataclass
18-
from typing import AsyncIterator, Optional
18+
from typing import AsyncIterator, NoReturn, Optional
1919

2020
from ._ffi_client import FfiClient, FfiHandle
2121
from ._proto import ffi_pb2 as proto_ffi
@@ -231,7 +231,10 @@ async def __anext__(self) -> DataTrackFrame:
231231
if self._closed:
232232
raise StopAsyncIteration
233233

234-
self._send_read_request()
234+
eos = self._send_read_request()
235+
if eos is not None:
236+
self._handle_eos(eos)
237+
235238
event: proto_ffi.FfiEvent = await self._queue.get()
236239
stream_event = event.data_track_stream_event
237240
detail = stream_event.WhichOneof("detail")
@@ -246,18 +249,34 @@ async def __anext__(self) -> DataTrackFrame:
246249
user_timestamp=user_ts,
247250
)
248251
elif detail == "eos":
249-
self._close()
250-
if stream_event.eos.HasField("error"):
251-
raise SubscribeDataTrackError(stream_event.eos.error.message)
252-
raise StopAsyncIteration
252+
self._handle_eos(stream_event.eos)
253253
else:
254254
self._close()
255255
raise StopAsyncIteration
256256

257-
def _send_read_request(self) -> None:
257+
def _send_read_request(self) -> Optional[proto_data_track.DataTrackStreamEOS]:
258258
req = proto_ffi.FfiRequest()
259259
req.data_track_stream_read.stream_handle = self._ffi_handle.handle
260-
FfiClient.instance.request(req)
260+
resp = FfiClient.instance.request(req)
261+
return self._read_response_eos(resp.data_track_stream_read)
262+
263+
@staticmethod
264+
def _read_response_eos(
265+
read_response: proto_data_track.DataTrackStreamReadResponse,
266+
) -> Optional[proto_data_track.DataTrackStreamEOS]:
267+
try:
268+
if not read_response.HasField("eos"):
269+
return None
270+
except ValueError:
271+
return None
272+
273+
return getattr(read_response, "eos")
274+
275+
def _handle_eos(self, eos: proto_data_track.DataTrackStreamEOS) -> NoReturn:
276+
self._close()
277+
if eos.HasField("error"):
278+
raise SubscribeDataTrackError(eos.error.message)
279+
raise StopAsyncIteration
261280

262281
def _close(self) -> None:
263282
if not self._closed:

0 commit comments

Comments
 (0)