Skip to content

Commit 1337848

Browse files
committed
Merge branch 'ws-coordinator' into webrtc
2 parents e4b716b + c2c7257 commit 1337848

20 files changed

Lines changed: 3806 additions & 2674 deletions
File renamed without changes.
Lines changed: 378 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,378 @@
1+
2+
# Design & Implementation Guide
3+
Streaming Py – WebSocket client for the Video Coordinator (`StreamAPIWS`)
4+
5+
---
6+
7+
## 1 Overview
8+
`StreamAPIWS` is an asynchronous WebSocket client that lets the SDK talk to Stream’s Video Coordinator at
9+
`wss://chat.stream-io-api.com/api/v2/connect?api_key=<API_KEY>`.
10+
11+
Key capabilities
12+
13+
• Authenticate immediately after the TCP/WS handshake
14+
• Emit all server events through a `pyee.AsyncIOEventEmitter` interface (`event.type` is the channel)
15+
• Maintain liveness with heart-beats (send every 25 s, expect any inbound message within 30 s)
16+
• Recover transparently from network drops using an exponential-back-off retry loop (max 5 attempts)
17+
• Expose an explicit `disconnect()` coroutine for graceful shutdown
18+
• Follow existing logging conventions (`logger = logging.getLogger(__name__)`, no global config)
19+
20+
The public surface is intentionally small:
21+
22+
```python
23+
ws_client = StreamAPIWS(api_key, token, user_id)
24+
connected_payload = await ws_client.connect() # raises if first msg.type == "error"
25+
26+
@ws_client.on("health.check")
27+
async def _(_: dict): ...
28+
...
29+
await ws_client.disconnect()
30+
```
31+
32+
---
33+
34+
## 2 Dependencies & Versions
35+
36+
| Purpose | Package | Min-version | Notes |
37+
|---------------------|----------------|-------------|-------|
38+
| Async WebSockets | websockets | 12 .x | Pure-Python, CPython-only OK |
39+
| Event dispatcher | pyee | 13 .x | Already optional in `webrtc` group |
40+
| Async HTTP helper | aiohttp | 3.11 .x | Already in `pyproject.toml` (used only for JWT fetch tests) |
41+
| Testing | pytest-asyncio | ≥0.23.8 | In dev-deps |
42+
43+
Add `websockets>=12,<13` to the existing `webrtc` optional-dependency group and run `uv add websockets --group webrtc`.
44+
45+
Supported Python: 3.9 – 3.12 (per project `requires-python`).
46+
47+
---
48+
49+
## 3 Directory & Module Layout
50+
51+
```
52+
getstream/
53+
└─ video/
54+
└─ rtc/
55+
└─ coordinator/
56+
├─ __init__.py # Public re-exports
57+
├─ ws.py # StreamAPIWS implementation
58+
├─ backoff.py # Reusable exponential strategy (tiny helper)
59+
└─ tests/
60+
├─ test_connect.py
61+
├─ test_heartbeat.py
62+
├─ test_reconnect.py
63+
└─ assets/ # JSON fixtures if needed
64+
```
65+
66+
---
67+
68+
## 4 API Shape
69+
70+
```python
71+
class StreamAPIWS(pyee.AsyncIOEventEmitter):
72+
def __init__(
73+
self,
74+
api_key: str,
75+
token: str,
76+
user_id: str,
77+
*,
78+
uri: str = DEFAULT_WS_URI,
79+
healthcheck_interval: float = 25.0,
80+
healthcheck_timeout: float = 30.0,
81+
max_retries: int = 5,
82+
backoff_base: float = 1.0,
83+
backoff_factor: float = 2.0,
84+
logger: logging.Logger | None = None,
85+
): ...
86+
87+
async def connect(self) -> dict:
88+
"""Open the socket, authenticate, wait for first server frame.
89+
Returns the first event payload (usually {"type": "connection.ok"}).
90+
Raises StreamWSConnectionError if the first frame has type == "error"
91+
or if authentication times out.
92+
"""
93+
94+
async def disconnect(self) -> None:
95+
"""Cancel internal tasks and close the WebSocket cleanly."""
96+
```
97+
98+
All kwargs have sensible defaults but remain configurable for unit tests.
99+
100+
---
101+
102+
## 5 Connection Lifecycle
103+
104+
1. **TCP / TLS / WS handshake** via `websockets.connect(uri, ..., ping_interval=None)`
105+
(disable the library’s automatic pings; we handle heart-beats ourselves).
106+
107+
2. **Send authentication payload immediately**
108+
109+
```json
110+
{
111+
"token": "<jwt_token>",
112+
"products": ["video"],
113+
"user_details": {"id": "<user_id>"}
114+
}
115+
```
116+
117+
3. **Await the first server message**
118+
• If `payload["type"] == "error"` → raise `StreamWSAuthError(payload)` (no retry).
119+
• Otherwise emit `payload` (`event.type` channel) and return it to caller.
120+
121+
4. **Start two background tasks**
122+
a. **_reader_task** – forever `await ws.recv()`, JSON-parse, emit.
123+
• Reset `last_received` timestamp on every frame.
124+
• On `websockets.exceptions.ConnectionClosedError`, jump to reconnection.
125+
b. **_heartbeat_task** – every `healthcheck_interval` seconds:
126+
• Send `{"type":"health.check"}`.
127+
• If `now - last_received > healthcheck_timeout`, treat as dead connection → reconnection.
128+
129+
5. **Reconnection strategy**
130+
`attempt = 0`
131+
• For network-driven closures OR heartbeat timeouts:
132+
```
133+
while attempt < max_retries:
134+
delay = backoff_base * backoff_factor**attempt # 1,2,4,8,16
135+
await asyncio.sleep(delay)
136+
try: await _open_socket_again()
137+
except OSError: attempt += 1; continue
138+
else: break
139+
else:
140+
raise StreamWSMaxRetriesExceeded()
141+
```
142+
• On success, authentication & first-message steps repeat transparently; internal tasks restart; events continue flowing.
143+
144+
6. **Graceful shutdown** (`disconnect()`):
145+
• Cancel heartbeat & reader tasks (with `asyncio.CancelledError` swallowed).
146+
• Close the WS (`await ws.close(code=1000, reason="client disconnect")`).
147+
148+
---
149+
150+
## 6 Event Dispatching
151+
152+
`pyee.AsyncIOEventEmitter` provides `.on()`, `.once()`, `.emit()`.
153+
Implementation detail: internally call `super().emit(event_type, payload)` on every inbound frame.
154+
Handlers receive the raw JSON dict (already parsed).
155+
156+
---
157+
158+
## 7 Logging
159+
160+
Use module-level logger:
161+
162+
```python
163+
logger = logging.getLogger(__name__)
164+
165+
logger.info("Connecting to coordinator", extra={"uri": uri})
166+
logger.debug("Sending auth payload")
167+
logger.warning("Reconnect attempt %s/%s failed", attempt, max_retries, exc_info=err)
168+
```
169+
170+
No handler configuration; rely on application to wire sinks & levels.
171+
172+
---
173+
174+
## 8 Exceptions
175+
176+
Create small hierarchy in `getstream.video.rtc.coordinator.errors`:
177+
178+
```python
179+
class StreamWSException(Exception): ...
180+
class StreamWSAuthError(StreamWSException): ...
181+
class StreamWSMaxRetriesExceeded(StreamWSException): ...
182+
class StreamWSConnectionError(StreamWSException): ...
183+
```
184+
185+
First-frame `"error"` -> `StreamWSAuthError`, transient socket failures -> `StreamWSConnectionError`.
186+
187+
---
188+
189+
## 9 Testing Strategy
190+
191+
**Happy path**: use `pytest-asyncio` with a local `websockets.server` that echoes health checks.
192+
**Error-on-first‐frame**: server yields `{"type":"error"}` then closes; ensure `connect()` raises & no retries.
193+
**Heartbeat timeout**: freeze time or delay server responses; verify client reconnects after 30 s.
194+
**Exponential back-off**: monkey-patch `asyncio.sleep` to record delays (1,2,4,8,16).
195+
**Disconnect API**: mock ws.close(), ensure tasks are cancelled.
196+
197+
Helper fixtures can live in `tests/conftest.py`.
198+
199+
---
200+
201+
## 10 Future Extensions (out-of-scope for v1)
202+
203+
• Custom back-off strategies via callable injection
204+
• Jitter addition to back-off
205+
• Multi-consumer fan-out (`async for payload in ws_client.listen("*")`)
206+
• TLS termination metrics, RTT histogram export
207+
208+
---
209+
210+
## 11 Next Steps
211+
212+
1. Add `websockets` to the `webrtc` dep group (`uv add websockets --group webrtc`).
213+
2. Scaffold modules & tests as per §3.
214+
3. Implement `backoff.py`, then `ws.py` iteratively with unit tests.
215+
4. Run `uv run pytest` and `uv run ruff format getstream/ tests/`.
216+
5. Write library changelog & update README.
217+
218+
Once you’re happy with this design, we can start coding the implementation.
219+
220+
221+
222+
Below is a Cursor-friendly “recipe” that breaks the whole feature into 7 incremental, self-contained steps.
223+
Follow them in order; after finishing each one, run the indicated commands to verify that everything still passes before moving on.
224+
225+
-------------------------------------------------
226+
STEP 1 – Add the external dependency
227+
-------------------------------------------------
228+
Files to touch
229+
`pyproject.toml`
230+
231+
Actions
232+
1. Under `[project.optional-dependencies].webrtc` append
233+
```
234+
"websockets>=12,<13",
235+
```
236+
2. From the project root, install the new dep:
237+
```
238+
uv add websockets --group webrtc
239+
uv sync --dev --all-packages
240+
```
241+
Tests
242+
• Nothing to run yet, but confirm lockfile updates cleanly:
243+
```
244+
uv run python -c "import websockets, pyee, aiohttp; print('deps OK')"
245+
```
246+
247+
-------------------------------------------------
248+
STEP 2 – Scaffold the module layout
249+
-------------------------------------------------
250+
Files/directories to create
251+
`getstream/video/rtc/coordinator/__init__.py`
252+
`getstream/video/rtc/coordinator/ws.py` (empty placeholder)
253+
`getstream/video/rtc/coordinator/backoff.py` (empty placeholder)
254+
`getstream/video/rtc/coordinator/errors.py` (empty placeholder)
255+
256+
Actions
257+
1. Create each file with just a docstring and `logger = logging.getLogger(__name__)` (where relevant).
258+
2. Re-export the public class in `__init__.py`:
259+
```python
260+
from .ws import StreamAPIWS # noqa: F401
261+
```
262+
Tests
263+
• Run the full suite to ensure you didn’t break import graphs:
264+
```
265+
uv run pytest -q
266+
```
267+
268+
-------------------------------------------------
269+
STEP 3 – Implement exponential-back-off helper
270+
-------------------------------------------------
271+
File to edit
272+
`getstream/video/rtc/coordinator/backoff.py`
273+
274+
Actions
275+
1. Add `async def exp_backoff(max_retries:int, base:float=1.0, factor:float=2.0): -> AsyncIterator[float]`
276+
2. Yield the successive delays (e.g. 1,2,4,8,16) until `max_retries` exhausted.
277+
278+
Tests
279+
Create `getstream/video/rtc/coordinator/tests/test_backoff.py` with a small `pytest-asyncio` test that collects the yielded seconds and asserts the sequence equals `[1,2,4,8,16]` for `max_retries=5`.
280+
281+
Run
282+
```
283+
uv run pytest getstream/video/rtc/coordinator/tests/test_backoff.py -q
284+
```
285+
286+
-------------------------------------------------
287+
STEP 4 – Define the error hierarchy
288+
-------------------------------------------------
289+
File to edit
290+
`getstream/video/rtc/coordinator/errors.py`
291+
292+
Actions
293+
Implement:
294+
```python
295+
class StreamWSException(Exception): ...
296+
class StreamWSAuthError(StreamWSException): ...
297+
class StreamWSConnectionError(StreamWSException): ...
298+
class StreamWSMaxRetriesExceeded(StreamWSException): ...
299+
```
300+
301+
Tests
302+
Add `tests/test_errors.py` verifying simple inheritance relationships.
303+
304+
Run
305+
```
306+
uv run pytest getstream/video/rtc/coordinator/tests/test_errors.py -q
307+
```
308+
309+
-------------------------------------------------
310+
STEP 5 – Build the minimal StreamAPIWS (connect / disconnect only)
311+
-------------------------------------------------
312+
File to edit
313+
`getstream/video/rtc/coordinator/ws.py`
314+
315+
Actions
316+
1. Make `StreamAPIWS` inherit from `pyee.AsyncIOEventEmitter`.
317+
2. Implement `__init__`, `_build_auth_payload`, `connect()`, `disconnect()`.
318+
– For now, skip heartbeat and retry logic; simply open the socket, send auth, wait for first frame, raise `StreamWSAuthError` if `type=="error"`.
319+
3. Add module-level logger.
320+
321+
Tests
322+
`tests/test_connect.py`
323+
• Spin up an in-process mock server with `websockets.serve` that sends a single `{"type":"connection.ok"}` frame.
324+
• Assert `connect()` returns that payload.
325+
• Another test where the server sends `{"type":"error"}` and ensure `StreamWSAuthError` is raised and the socket is closed.
326+
327+
Run
328+
```
329+
uv run pytest getstream/video/rtc/coordinator/tests/test_connect.py -q
330+
```
331+
332+
-------------------------------------------------
333+
STEP 6 – Add heartbeat (25 s send, 30 s receive) and auto-reconnect
334+
-------------------------------------------------
335+
File to edit
336+
`getstream/video/rtc/coordinator/ws.py`
337+
338+
Actions
339+
1. Add two background tasks started by `connect()`:
340+
`_reader_task` – reads frames, emits events, updates `self._last_received`.
341+
`_heartbeat_task` – every 25 s sends `{"type":"health.check"}`; if `now - _last_received > 30 s` triggers `_reconnect()`.
342+
2. Implement internal `_reconnect()` using the helper from step 3 with `max_retries=5`.
343+
344+
Tests
345+
`tests/test_heartbeat.py`
346+
• Use `freezegun.advance_by` or manual `asyncio.sleep` patching to imitate time passage and verify reconnect is attempted after 30 s of silence.
347+
• Ensure that on success the client continues emitting events.
348+
349+
Run
350+
```
351+
uv run pytest getstream/video/rtc/coordinator/tests/test_heartbeat.py -q
352+
```
353+
354+
-------------------------------------------------
355+
STEP 7 – Final polish: logging, ruff, full suite
356+
-------------------------------------------------
357+
Files to touch
358+
`getstream/video/rtc/coordinator/ws.py` (add informative `logger.debug/info/warning` calls)
359+
• Any `__init__.py` for re-exports if needed.
360+
361+
Actions
362+
1. Ensure every catch-block logs `exc_info`.
363+
2. Run formatter & linter:
364+
```
365+
uv run ruff format getstream/ tests/
366+
uv run ruff check getstream/ tests/ --fix
367+
```
368+
3. Run the entire test matrix:
369+
```
370+
uv run pytest -q
371+
```
372+
373+
If everything is green, the feature is complete and ready for PR review.
374+
375+
-------------------------------------------------
376+
How to iterate
377+
Work through one step at a time, commit, run the specified tests, and only then proceed.
378+
This keeps the Cursor change-set small and the feedback loop fast.

getstream/plugins/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ from getstream.plugins import STT, TTS, VAD
8484

8585
## Creating New Plugins
8686

87-
See the [Plugin Development Guide](../../ai/instructions/projects/ai-plugin.md) for detailed instructions on creating new plugins. The workspace setup makes plugin development straightforward:
87+
See the [Plugin Development Guide](../../ai/instructions/ai-plugin.md) for detailed instructions on creating new plugins. The workspace setup makes plugin development straightforward:
8888

8989
1. Create your plugin directory under the appropriate type folder
9090
2. Add a `pyproject.toml` with `getstream[webrtc]` as a dependency

0 commit comments

Comments
 (0)