|
9 | 9 | from twirp.context import Context |
10 | 10 |
|
11 | 11 | from getstream.utils import StreamAsyncIOEventEmitter |
| 12 | +from getstream.video.rtc.coordinator.ws import StreamAPIWS |
| 13 | +from getstream.video.rtc.pb.stream.video.sfu.event import events_pb2 |
12 | 14 | from getstream.video.rtc.pb.stream.video.sfu.models import models_pb2 |
13 | 15 | from getstream.video.rtc.pb.stream.video.sfu.signal_rpc import signal_pb2 |
14 | | -from getstream.video.rtc.twirp_client_wrapper import SignalClient |
| 16 | +from getstream.video.rtc.twirp_client_wrapper import SfuRpcError, SignalClient |
15 | 17 |
|
16 | 18 | from getstream.video.call import Call |
17 | 19 | from getstream.video.rtc.connection_utils import ( |
|
21 | 23 | connect_websocket, |
22 | 24 | join_call, |
23 | 25 | ) |
| 26 | +from getstream.video.rtc.track_util import ( |
| 27 | + fix_sdp_msid_semantic, |
| 28 | + parse_track_stream_mapping, |
| 29 | +) |
24 | 30 | from getstream.video.rtc.network_monitor import NetworkMonitor |
25 | 31 | from getstream.video.rtc.recording import RecordingManager |
26 | 32 | from getstream.video.rtc.participants import ParticipantsState |
@@ -72,6 +78,7 @@ def __init__( |
72 | 78 | self._recording_manager: RecordingManager = RecordingManager() |
73 | 79 | self._network_monitor: NetworkMonitor = NetworkMonitor(self) |
74 | 80 | self._reconnector: ReconnectionManager = ReconnectionManager(self) |
| 81 | + logger.info(f"VIVEK subscription_config: {subscription_config}") |
75 | 82 | self._subscription_manager: SubscriptionManager = SubscriptionManager( |
76 | 83 | self, subscription_config |
77 | 84 | ) |
@@ -124,48 +131,57 @@ async def _on_ice_trickle(self, event): |
124 | 131 | except Exception as e: |
125 | 132 | logger.debug(f"Error handling ICE trickle: {e}") |
126 | 133 |
|
127 | | - async def _on_subscriber_offer(self, event): |
128 | | - """Handle subscriber offer from SFU.""" |
129 | | - logger.info(f"Received subscriber offer: ice_restart={event.ice_restart}") |
130 | | - |
131 | | - try: |
132 | | - # Ensure we have a subscriber peer connection |
133 | | - if not self.subscriber_pc: |
134 | | - await self._peer_manager.setup_subscriber() |
135 | | - |
136 | | - # Parse SDP to extract track-to-stream mapping |
137 | | - self._extract_track_stream_mapping(event.sdp) |
| 134 | + async def _on_subscriber_offer(self, event: events_pb2.SubscriberOffer): |
| 135 | + logger.info("Subscriber offer received") |
138 | 136 |
|
139 | | - # Handle ICE restart if needed |
140 | | - if event.ice_restart: |
141 | | - logger.info("Restarting ICE for subscriber") |
142 | | - await self.subscriber_pc.restartIce() |
| 137 | + await self.subscriber_negotiation_lock.acquire() |
143 | 138 |
|
144 | | - # Set remote description with the SFU's offer |
| 139 | + try: |
| 140 | + # Fix any invalid msid-semantic format in the SDP |
| 141 | + fixed_sdp = fix_sdp_msid_semantic(event.sdp) |
| 142 | + # Parse SDP to create track_id to stream_id mapping |
| 143 | + self.participants_state.set_track_stream_mapping( |
| 144 | + parse_track_stream_mapping(fixed_sdp) |
| 145 | + ) |
| 146 | + # The SDP offer from the SFU might already contain candidates (trickled) |
| 147 | + # or have a different structure. We set it as the remote description. |
| 148 | + # The aiortc library handles merging and interpretation. |
145 | 149 | remote_description = aiortc.RTCSessionDescription( |
146 | | - type="offer", sdp=event.sdp |
| 150 | + type="offer", sdp=fixed_sdp |
147 | 151 | ) |
| 152 | + logger.debug(f"""Setting remote description with SDP: |
| 153 | + {remote_description.sdp}""") |
148 | 154 | await self.subscriber_pc.setRemoteDescription(remote_description) |
149 | 155 |
|
150 | | - # Create and set local answer |
| 156 | + # Create the answer based on the remote offer (which includes our candidates) |
151 | 157 | answer = await self.subscriber_pc.createAnswer() |
| 158 | + # Set the local description. aiortc will manage the SDP content. |
152 | 159 | await self.subscriber_pc.setLocalDescription(answer) |
153 | 160 |
|
154 | | - # Send answer back to SFU |
155 | | - response = await self.twirp_signaling_client.SendAnswer( |
156 | | - ctx=self.twirp_context, |
157 | | - request=signal_pb2.SendAnswerRequest( |
158 | | - session_id=self.session_id, |
159 | | - peer_type=models_pb2.PEER_TYPE_SUBSCRIBER, |
160 | | - sdp=self.subscriber_pc.localDescription.sdp, |
161 | | - ), |
162 | | - server_path_prefix="", |
| 161 | + logger.info( |
| 162 | + f"""Sending answer with local description: |
| 163 | + {self.subscriber_pc.localDescription.sdp}""" |
163 | 164 | ) |
164 | | - logger.info(f"Sent subscriber answer: {response}") |
165 | 165 |
|
166 | | - except Exception as e: |
167 | | - logger.error(f"Error handling subscriber offer: {e}") |
168 | | - raise |
| 166 | + try: |
| 167 | + await self.twirp_signaling_client.SendAnswer( |
| 168 | + ctx=self.twirp_context, |
| 169 | + request=signal_pb2.SendAnswerRequest( |
| 170 | + peer_type=models_pb2.PEER_TYPE_SUBSCRIBER, |
| 171 | + sdp=self.subscriber_pc.localDescription.sdp, |
| 172 | + session_id=self.session_id, |
| 173 | + ), |
| 174 | + server_path_prefix="", # Note: Our wrapper doesn't need this, underlying client handles prefix |
| 175 | + ) |
| 176 | + logger.info("Subscriber answer sent successfully.") |
| 177 | + except SfuRpcError as e: |
| 178 | + logger.error(f"Failed to send subscriber answer: {e}") |
| 179 | + # Decide how to handle: maybe close connection, notify user, etc. |
| 180 | + # For now, just log the error. |
| 181 | + except Exception as e: |
| 182 | + logger.error(f"Unexpected error sending subscriber answer: {e}") |
| 183 | + finally: |
| 184 | + self.subscriber_negotiation_lock.release() |
169 | 185 |
|
170 | 186 | def _extract_track_stream_mapping(self, sdp: str): |
171 | 187 | """Extract track-to-stream mapping from SDP.""" |
@@ -242,6 +258,8 @@ async def _connect_internal( |
242 | 258 | # Use provided session_id or current one |
243 | 259 | current_session_id = session_id or self.session_id |
244 | 260 |
|
| 261 | + await self._peer_manager.setup_subscriber() |
| 262 | + |
245 | 263 | # Step 3: Connect to WebSocket |
246 | 264 | try: |
247 | 265 | self._ws_client, sfu_event = await connect_websocket( |
@@ -290,20 +308,14 @@ async def _connect_internal( |
290 | 308 | self.twirp_context = Context(headers={"authorization": token}) |
291 | 309 |
|
292 | 310 | # Step 5: Create coordinator websocket (temporarily disabled to test) |
293 | | - # user_token = self.call.client.stream.create_token(user_id=self.user_id) |
294 | | - # self._coordinator_ws_client = StreamAPIWS( |
295 | | - # api_key=self.call.client.stream.api_key, |
296 | | - # token=user_token, |
297 | | - # user_details={"id": self.user_id}, |
298 | | - # healthcheck_interval=15.0, # Send heartbeat every 15 seconds instead of 25 |
299 | | - # healthcheck_timeout=20.0, # Expect server messages within 20 seconds instead of 30 |
300 | | - # ) |
301 | | - # self._coordinator_ws_client.on_wildcard("*", _log_event) |
302 | | - # await self._coordinator_ws_client.connect() |
303 | | - self._coordinator_ws_client = None # Temporarily disable coordinator connection |
304 | | - |
305 | | - # Step 6: Setup subscriber peer connection to receive incoming tracks |
306 | | - await self._peer_manager.setup_subscriber() |
| 311 | + user_token = self.call.client.stream.create_token(user_id=self.user_id) |
| 312 | + self._coordinator_ws_client = StreamAPIWS( |
| 313 | + api_key=self.call.client.stream.api_key, |
| 314 | + token=user_token, |
| 315 | + user_details={"id": self.user_id}, |
| 316 | + ) |
| 317 | + self._coordinator_ws_client.on_wildcard("*", _log_event) |
| 318 | + await self._coordinator_ws_client.connect() |
307 | 319 |
|
308 | 320 | # Mark as connected |
309 | 321 | self.running = True |
|
0 commit comments