Skip to content

Commit 0b6493a

Browse files
Add support for local stream processing to webrtc sdk (#2200)
* Add support for local stream processing to webrtc sdk * fixed async blocking issue * added timeout rtsp * Add paced track for rtsp streams to address corrupted frames --------- Co-authored-by: Rafel Bennasar Crespi <253519461+rafel-roboflow@users.noreply.github.com>
1 parent ff64c93 commit 0b6493a

3 files changed

Lines changed: 245 additions & 0 deletions

File tree

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
"""
2+
Minimal sample using the SDK's WebRTC namespace to stream frames from a local
3+
RTSP or RTMP camera to an inference server with WebRTC worker enabled.
4+
5+
Unlike RTSPSource (where the server captures the stream), LocalStreamSource
6+
captures the stream locally and sends frames to the server. Use this when:
7+
- The camera is only accessible from your local network
8+
- The server cannot reach the camera directly
9+
10+
Supported protocols:
11+
- RTSP: rtsp://host/stream or rtsps://host/stream
12+
- RTMP: rtmp://host/stream or rtmps://host/stream
13+
14+
Usage:
15+
python examples/webrtc_sdk/local_stream_basic.py \\
16+
--stream-url rtsp://camera.local/stream \\
17+
--workspace-name <your_workspace> \\
18+
--workflow-id <your_workflow_id> \\
19+
[--api-url http://localhost:9001] \\
20+
[--api-key <ROBOFLOW_API_KEY>] \\
21+
[--stream-output <output_name>] \\
22+
[--data-output <output_name>]
23+
24+
Press 'q' in the preview window to exit.
25+
"""
26+
import argparse
27+
28+
import av
29+
import cv2
30+
31+
from inference_sdk import InferenceHTTPClient
32+
from inference_sdk.webrtc import LocalStreamSource, StreamConfig, VideoMetadata
33+
34+
# Suppress FFmpeg warnings from PyAV
35+
av.logging.set_level(av.logging.ERROR)
36+
37+
38+
def parse_args() -> argparse.Namespace:
39+
p = argparse.ArgumentParser("WebRTC SDK local_stream_basic")
40+
p.add_argument(
41+
"--stream-url",
42+
required=True,
43+
help="Stream URL (rtsp://, rtsps://, rtmp://, or rtmps://)",
44+
)
45+
p.add_argument("--api-url", default="https://serverless.roboflow.com")
46+
p.add_argument("--workspace-name", required=True)
47+
p.add_argument("--workflow-id", required=True)
48+
p.add_argument("--image-input-name", default="image")
49+
p.add_argument("--api-key", default=None)
50+
p.add_argument(
51+
"--stream-output",
52+
default=None,
53+
help="Name of the workflow output to stream (e.g., 'image_output')",
54+
)
55+
p.add_argument(
56+
"--data-output",
57+
default=None,
58+
help="Name of the workflow output to receive via data channel",
59+
)
60+
return p.parse_args()
61+
62+
63+
def main() -> None:
64+
args = parse_args()
65+
client = InferenceHTTPClient.init(api_url=args.api_url, api_key=args.api_key)
66+
67+
# Prepare source - captures stream locally and sends to server
68+
source = LocalStreamSource(args.stream_url)
69+
70+
# Prepare config
71+
stream_output = [args.stream_output] if args.stream_output else []
72+
data_output = [args.data_output] if args.data_output else []
73+
config = StreamConfig(stream_output=stream_output, data_output=data_output)
74+
75+
# Create streaming session
76+
session = client.webrtc.stream(
77+
source=source,
78+
workflow=args.workflow_id,
79+
workspace=args.workspace_name,
80+
image_input=args.image_input_name,
81+
config=config,
82+
)
83+
84+
# Register frame handler
85+
@session.on_frame
86+
def show_frame(frame, metadata):
87+
cv2.imshow("WebRTC SDK - Local Stream", frame)
88+
if cv2.waitKey(1) & 0xFF == ord("q"):
89+
session.close() # Close session and cleanup resources
90+
91+
# Register data handlers
92+
# Global handler (receives entire serialized_output_data dict + metadata)
93+
@session.on_data()
94+
def on_message(data: dict, metadata: VideoMetadata):
95+
print(f"Frame {metadata.frame_id}: {data}")
96+
97+
# Field-specific handler example (uncomment and customize based on your workflow):
98+
# @session.on_data("predictions")
99+
# def on_predictions(predictions: dict, metadata: VideoMetadata):
100+
# print(f"Frame {metadata.frame_id} predictions: {predictions}")
101+
102+
# Run the session (auto-starts, blocks until close() is called or stream ends)
103+
# Automatically closes on exception or when stream ends
104+
session.run()
105+
106+
107+
if __name__ == "__main__":
108+
main()

inference_sdk/webrtc/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from .config import StreamConfig # noqa: F401
55
from .session import VideoMetadata, WebRTCSession # noqa: F401
66
from .sources import ( # noqa: F401
7+
LocalStreamSource,
78
ManualSource,
89
MJPEGSource,
910
RTSPSource,
@@ -22,6 +23,7 @@
2223
# Source classes
2324
"StreamSource",
2425
"WebcamSource",
26+
"LocalStreamSource",
2527
"RTSPSource",
2628
"MJPEGSource",
2729
"VideoFileSource",

inference_sdk/webrtc/sources.py

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,16 @@
55
"""
66

77
import asyncio
8+
import threading
9+
import time
810
from abc import ABC, abstractmethod
911
from typing import TYPE_CHECKING, Any, Callable, Dict, Optional, Tuple
1012

1113
import cv2
1214
import numpy as np
1315
from aiortc import RTCPeerConnection, VideoStreamTrack
16+
from aiortc.contrib.media import MediaPlayer
17+
from aiortc.mediastreams import VIDEO_CLOCK_RATE, VIDEO_PTIME, VIDEO_TIME_BASE
1418
from av import VideoFrame
1519

1620
from inference_sdk.http.errors import InvalidParameterError
@@ -181,6 +185,137 @@ async def cleanup(self) -> None:
181185
self._track.release()
182186

183187

188+
class _PacedTrack(VideoStreamTrack):
189+
"""Wraps a source track and paces frame delivery like VideoStreamTrack.
190+
191+
MediaPlayer's PlayerStreamTrack does NOT pace RTSP frames (RTSP is in
192+
REAL_TIME_FORMATS so _throttle_playback is False). This means frames
193+
are pulled from FFmpeg and forwarded to the RTP sender as fast as
194+
possible, causing packet bursts that overflow the receiver's jitter
195+
buffer.
196+
197+
This wrapper adds the same ~33 ms sleep between frames that the base
198+
VideoStreamTrack uses, preventing bursts.
199+
"""
200+
201+
def __init__(self, source): # noqa: ANN001
202+
super().__init__()
203+
self._source = source
204+
self._start_time: float = 0.0
205+
self._pts = 0
206+
207+
async def recv(self) -> VideoFrame:
208+
from aiortc.mediastreams import MediaStreamError
209+
210+
if self.readyState != "live":
211+
raise MediaStreamError
212+
213+
frame = await self._source.recv()
214+
215+
# Pace delivery: sleep until the next frame slot (~33 ms apart)
216+
if self._pts == 0:
217+
self._start_time = time.time()
218+
else:
219+
wait = self._start_time + (self._pts / VIDEO_CLOCK_RATE) - time.time()
220+
if wait > 0:
221+
await asyncio.sleep(wait)
222+
223+
frame.pts = self._pts
224+
frame.time_base = VIDEO_TIME_BASE
225+
self._pts += int(VIDEO_PTIME * VIDEO_CLOCK_RATE)
226+
return frame
227+
228+
def stop(self) -> None:
229+
super().stop()
230+
self._source.stop()
231+
232+
233+
class LocalStreamSource(StreamSource):
234+
"""Stream source for locally captured RTSP/RTMP camera streams.
235+
236+
Unlike RTSPSource (where the server captures the RTSP stream), this source
237+
captures the stream locally using aiortc's MediaPlayer (FFmpeg-based) and
238+
sends frames to the server via WebRTC, similar to WebcamSource.
239+
240+
Supported protocols:
241+
- RTSP: rtsp://host/path or rtsps://host/path
242+
- RTMP: rtmp://host/path or rtmps://host/path
243+
244+
Use this when:
245+
- The camera is only accessible from the client machine (e.g., local network)
246+
- You want to preprocess frames before sending to the server
247+
- The server cannot access the stream URL directly
248+
"""
249+
250+
# Supported URL schemes
251+
SUPPORTED_SCHEMES = ("rtsp://", "rtsps://", "rtmp://", "rtmps://")
252+
253+
def __init__(self, url: str):
254+
"""Initialize local stream source.
255+
256+
Args:
257+
url: Stream URL. Supported formats:
258+
- RTSP: "rtsp://host/stream" or "rtsps://host/stream"
259+
- RTMP: "rtmp://host/stream" or "rtmps://host/stream"
260+
Credentials can be included: "rtsp://user:pass@host/stream"
261+
"""
262+
if not url.startswith(self.SUPPORTED_SCHEMES):
263+
raise InvalidParameterError(
264+
f"Invalid stream URL: {url}. "
265+
f"Must start with one of: {', '.join(self.SUPPORTED_SCHEMES)}"
266+
)
267+
self.url = url
268+
self._player: Optional[MediaPlayer] = None
269+
270+
async def configure_peer_connection(self, pc: RTCPeerConnection) -> None:
271+
"""Create MediaPlayer for stream and add video track to peer connection."""
272+
273+
if self.url.startswith(("rtsp://", "rtsps://")):
274+
self._player = await asyncio.to_thread(
275+
MediaPlayer,
276+
self.url,
277+
format="rtsp",
278+
options={
279+
"rtsp_transport": "tcp",
280+
"rtsp_flags": "prefer_tcp",
281+
"stimeout": "5000000", # 5s RTSP socket timeout
282+
"timeout": "5000000", # 5s TCP connection timeout
283+
},
284+
)
285+
else:
286+
self._player = await asyncio.to_thread(
287+
MediaPlayer,
288+
self.url,
289+
options={
290+
"rw_timeout": "5000000", # 5s socket timeout
291+
},
292+
)
293+
294+
if self._player.video is None:
295+
raise RuntimeError(f"No video track available from stream: {self.url}")
296+
297+
# Wrap in a pacing track — MediaPlayer does not pace RTSP frames
298+
# (RTSP is in REAL_TIME_FORMATS so _throttle_playback is False).
299+
# Without pacing the RTP sender bursts all packets at once,
300+
# overflowing the receiver's jitter buffer → truncated VP8 frames.
301+
self._paced_track = _PacedTrack(self._player.video)
302+
pc.addTrack(self._paced_track)
303+
304+
def get_initialization_params(self, config: "StreamConfig") -> Dict[str, Any]:
305+
"""Return empty params - stream is captured locally, not by server."""
306+
return {}
307+
308+
async def cleanup(self) -> None:
309+
"""Stop the paced track and MediaPlayer."""
310+
if hasattr(self, "_paced_track") and self._paced_track:
311+
self._paced_track.stop()
312+
self._paced_track = None
313+
if self._player:
314+
if self._player.video:
315+
self._player.video.stop()
316+
self._player = None
317+
318+
184319
class RTSPSource(StreamSource):
185320
"""Stream source for RTSP camera streams.
186321

0 commit comments

Comments
 (0)