Skip to content

Commit 210c81c

Browse files
Merge pull request #1305 from roboflow/fix/improve-webrtc-stability
Improvements to webrtc stability
2 parents 697a8a0 + 478b69e commit 210c81c

7 files changed

Lines changed: 182 additions & 107 deletions

File tree

inference/core/interfaces/http/http_api.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1493,9 +1493,11 @@ async def initialise(request: InitialisePipelinePayload) -> CommandResponse:
14931493
async def initialise_webrtc_inference_pipeline(
14941494
request: InitialiseWebRTCPipelinePayload,
14951495
) -> CommandResponse:
1496+
logger.debug("Received initialise webrtc inference pipeline request")
14961497
resp = await self.stream_manager_client.initialise_webrtc_pipeline(
14971498
initialisation_request=request
14981499
)
1500+
logger.debug("Returning initialise webrtc inference pipeline response")
14991501
return resp
15001502

15011503
@app.post(

inference/core/interfaces/stream_manager/manager_app/app.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -253,8 +253,13 @@ def _terminate_pipeline(
253253
)
254254
with PROCESSES_TABLE_LOCK:
255255
# termination ended
256-
pipeline = self._processes_table[pipeline_id]
257-
pipeline.is_terminating = False
256+
if pipeline_id not in self._processes_table:
257+
logger.warning(
258+
f"Pipeline {pipeline_id} already removed from processes table."
259+
)
260+
else:
261+
pipeline = self._processes_table[pipeline_id]
262+
pipeline.is_terminating = False
258263
serialised_response = prepare_response(
259264
request_id=request_id, response=response, pipeline_id=pipeline_id
260265
)
@@ -537,7 +542,10 @@ def spawn_managed_pipeline_process(
537542

538543

539544
def _get_process_memory_usage_mb(process: Process) -> int:
540-
return psutil.Process(process.pid).memory_info().rss / (1024 * 1024)
545+
try:
546+
return psutil.Process(process.pid).memory_info().rss / (1024 * 1024)
547+
except psutil.NoSuchProcess:
548+
return 0
541549

542550

543551
def start(expected_warmed_up_pipelines: int = 0) -> None:

inference/core/interfaces/stream_manager/manager_app/entities.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,10 @@ class InitialiseWebRTCPipelinePayload(InitialisePipelinePayload):
108108
data_output: Optional[List[Optional[str]]] = Field(default_factory=list)
109109
webrtc_peer_timeout: float = 1
110110
webcam_fps: Optional[float] = None
111+
processing_timeout: float = 0.005
112+
fps_probe_frames: int = 10
113+
max_consecutive_timeouts: int = 30
114+
min_consecutive_on_time: int = 5
111115

112116

113117
class ConsumeResultsPayload(BaseModel):

inference/core/interfaces/stream_manager/manager_app/inference_pipeline_manager.py

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -254,8 +254,8 @@ def start_loop(loop: asyncio.AbstractEventLoop):
254254
webrtc_offer = parsed_payload.webrtc_offer
255255
webrtc_turn_config = parsed_payload.webrtc_turn_config
256256
webcam_fps = parsed_payload.webcam_fps
257-
to_inference_queue = SyncAsyncQueue(loop=loop)
258-
from_inference_queue = SyncAsyncQueue(loop=loop)
257+
to_inference_queue = SyncAsyncQueue(loop=loop, maxsize=10)
258+
from_inference_queue = SyncAsyncQueue(loop=loop, maxsize=10)
259259

260260
stop_event = Event()
261261

@@ -265,21 +265,34 @@ def start_loop(loop: asyncio.AbstractEventLoop):
265265
webrtc_turn_config=webrtc_turn_config,
266266
to_inference_queue=to_inference_queue,
267267
from_inference_queue=from_inference_queue,
268-
webrtc_peer_timeout=parsed_payload.webrtc_peer_timeout,
269268
feedback_stop_event=stop_event,
270269
asyncio_loop=loop,
271270
webcam_fps=webcam_fps,
271+
max_consecutive_timeouts=parsed_payload.max_consecutive_timeouts,
272+
min_consecutive_on_time=parsed_payload.min_consecutive_on_time,
273+
processing_timeout=parsed_payload.processing_timeout,
274+
fps_probe_frames=parsed_payload.fps_probe_frames,
272275
),
273276
loop,
274277
)
275278
peer_connection: RTCPeerConnectionWithFPS = future.result()
276279

280+
self._responses_queue.put(
281+
(
282+
request_id,
283+
{
284+
STATUS_KEY: OperationStatus.SUCCESS,
285+
"sdp": peer_connection.localDescription.sdp,
286+
"type": peer_connection.localDescription.type,
287+
},
288+
)
289+
)
290+
277291
webrtc_producer = partial(
278292
WebRTCVideoFrameProducer,
279293
to_inference_queue=to_inference_queue,
280294
stop_event=stop_event,
281295
webrtc_video_transform_track=peer_connection.video_transform_track,
282-
webrtc_peer_timeout=parsed_payload.webrtc_peer_timeout,
283296
)
284297

285298
def webrtc_sink(
@@ -349,16 +362,6 @@ def webrtc_sink(
349362
decoding_buffer_size=parsed_payload.decoding_buffer_size,
350363
)
351364
self._inference_pipeline.start(use_main_thread=False)
352-
self._responses_queue.put(
353-
(
354-
request_id,
355-
{
356-
STATUS_KEY: OperationStatus.SUCCESS,
357-
"sdp": peer_connection.localDescription.sdp,
358-
"type": peer_connection.localDescription.type,
359-
},
360-
)
361-
)
362365
logger.info(f"WebRTC pipeline initialised. request_id={request_id}...")
363366
except (
364367
ValidationError,

0 commit comments

Comments
 (0)