Skip to content

Commit b54c955

Browse files
authored
E2E Test: audio/video publish & subscribe (#632)
1 parent ff47b46 commit b54c955

2 files changed

Lines changed: 546 additions & 0 deletions

File tree

livekit-rtc/tests/test_audio.py

Lines changed: 271 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,271 @@
1+
# Copyright 2026 LiveKit, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""End-to-end audio publish/subscribe tests."""
16+
17+
import asyncio
18+
import ctypes
19+
import math
20+
import os
21+
import uuid
22+
import wave
23+
from pathlib import Path
24+
25+
import numpy as np
26+
import pytest
27+
28+
from livekit import api, rtc
29+
from livekit.rtc.audio_frame import AudioFrame
30+
31+
32+
SAMPLE_RATE = 48000
33+
NUM_CHANNELS = 1
34+
TONE_DURATION_SEC = 1.0
35+
FREQUENCIES_HZ = [100, 300, 500, 700, 1000]
36+
AMPLITUDE = 0.5
37+
38+
39+
def skip_if_no_credentials():
40+
required_vars = ["LIVEKIT_URL", "LIVEKIT_API_KEY", "LIVEKIT_API_SECRET"]
41+
missing = [var for var in required_vars if not os.getenv(var)]
42+
return pytest.mark.skipif(
43+
bool(missing), reason=f"Missing environment variables: {', '.join(missing)}"
44+
)
45+
46+
47+
def create_token(identity: str, room_name: str) -> str:
48+
return (
49+
api.AccessToken()
50+
.with_identity(identity)
51+
.with_name(identity)
52+
.with_grants(
53+
api.VideoGrants(
54+
room_join=True,
55+
room=room_name,
56+
)
57+
)
58+
.to_jwt()
59+
)
60+
61+
62+
def unique_room_name(base: str) -> str:
63+
return f"{base}-{uuid.uuid4().hex[:8]}"
64+
65+
66+
def _generate_sine_wave(
67+
frequency: int,
68+
sample_rate: int,
69+
num_channels: int,
70+
duration_sec: float,
71+
amplitude: float = 0.5,
72+
) -> AudioFrame:
73+
"""Generate an AudioFrame containing a sine wave at the given frequency."""
74+
samples_per_channel = int(sample_rate * duration_sec)
75+
t = np.arange(samples_per_channel, dtype=np.float64) / sample_rate
76+
wave_signal = np.sin(2.0 * math.pi * frequency * t) * amplitude
77+
pcm = (wave_signal * np.iinfo(np.int16).max).astype(np.int16)
78+
79+
if num_channels > 1:
80+
pcm = np.repeat(pcm[:, np.newaxis], num_channels, axis=1).reshape(-1)
81+
82+
return AudioFrame(
83+
data=pcm.tobytes(),
84+
sample_rate=sample_rate,
85+
num_channels=num_channels,
86+
samples_per_channel=samples_per_channel,
87+
)
88+
89+
90+
def _frame_to_mono_float(frame: AudioFrame) -> np.ndarray:
91+
"""Decode an int16 AudioFrame into a normalized float64 mono signal."""
92+
samples = np.frombuffer(bytes(frame.data.cast("B")), dtype=np.int16).astype(np.float64)
93+
if frame.num_channels > 1:
94+
samples = samples.reshape(-1, frame.num_channels).mean(axis=1)
95+
return samples / float(np.iinfo(np.int16).max)
96+
97+
98+
def _fft_spectrum(frame: AudioFrame) -> tuple[np.ndarray, np.ndarray]:
99+
"""Return (freqs, magnitudes) from a Hann-windowed rfft of `frame`."""
100+
signal = _frame_to_mono_float(frame)
101+
window = np.hanning(len(signal))
102+
# Compensate for the Hann window's coherent gain so magnitudes stay comparable.
103+
spectrum = np.fft.rfft(signal * window) / (np.sum(window) / 2.0)
104+
magnitudes = np.abs(spectrum)
105+
freqs = np.fft.rfftfreq(len(signal), d=1.0 / frame.sample_rate)
106+
return freqs, magnitudes
107+
108+
109+
def _detect_peak_frequency(frame: AudioFrame) -> float:
110+
"""Return the frequency bin with the largest magnitude in `frame`."""
111+
freqs, magnitudes = _fft_spectrum(frame)
112+
return float(freqs[int(np.argmax(magnitudes))])
113+
114+
115+
def _band_energies(
116+
frame: AudioFrame,
117+
centers: list[int],
118+
bandwidth_hz: float = 20.0,
119+
) -> dict[int, float]:
120+
"""Sum squared-magnitude (energy) in narrow bands centered at each frequency."""
121+
freqs, magnitudes = _fft_spectrum(frame)
122+
power = magnitudes**2
123+
return {
124+
center: float(
125+
np.sum(power[(freqs >= center - bandwidth_hz) & (freqs <= center + bandwidth_hz)])
126+
)
127+
for center in centers
128+
}
129+
130+
131+
@skip_if_no_credentials()
132+
class TestAudioStreamPublishSubscribe:
133+
"""End-to-end: publish a sine sweep into a room and verify spectrum on the subscriber."""
134+
135+
async def test_audio_stream_publish_subscribe(self):
136+
"""Publish 5 seconds of 100/300/500/700/1000 Hz tones and FFT-verify received audio."""
137+
url = os.environ["LIVEKIT_URL"]
138+
room_name = unique_room_name("test-audio-sweep")
139+
140+
publisher_room = rtc.Room()
141+
subscriber_room = rtc.Room()
142+
143+
publisher_token = create_token("audio-sweep-publisher", room_name)
144+
subscriber_token = create_token("audio-sweep-subscriber", room_name)
145+
146+
track_subscribed_event = asyncio.Event()
147+
subscribed_track: rtc.Track | None = None
148+
149+
@subscriber_room.on("track_subscribed")
150+
def on_track_subscribed(
151+
track: rtc.Track,
152+
publication: rtc.RemoteTrackPublication,
153+
participant: rtc.RemoteParticipant,
154+
):
155+
nonlocal subscribed_track
156+
if track.kind == rtc.TrackKind.KIND_AUDIO:
157+
subscribed_track = track
158+
track_subscribed_event.set()
159+
160+
try:
161+
await subscriber_room.connect(url, subscriber_token)
162+
await publisher_room.connect(url, publisher_token)
163+
164+
source = rtc.AudioSource(SAMPLE_RATE, NUM_CHANNELS)
165+
track = rtc.LocalAudioTrack.create_audio_track("sine-sweep", source)
166+
options = rtc.TrackPublishOptions()
167+
options.source = rtc.TrackSource.SOURCE_MICROPHONE
168+
await publisher_room.local_participant.publish_track(track, options)
169+
170+
await asyncio.wait_for(track_subscribed_event.wait(), timeout=10.0)
171+
assert subscribed_track is not None
172+
173+
audio_stream = rtc.AudioStream(
174+
subscribed_track,
175+
sample_rate=SAMPLE_RATE,
176+
num_channels=NUM_CHANNELS,
177+
)
178+
179+
total_duration = TONE_DURATION_SEC * len(FREQUENCIES_HZ)
180+
target_samples = int(SAMPLE_RATE * total_duration)
181+
# Collect a little extra to tolerate codec startup latency.
182+
collect_samples_target = target_samples + int(SAMPLE_RATE * 1.0)
183+
184+
async def publish_tones() -> None:
185+
await track_subscribed_event.wait()
186+
for freq in FREQUENCIES_HZ:
187+
frame = _generate_sine_wave(
188+
freq,
189+
SAMPLE_RATE,
190+
NUM_CHANNELS,
191+
TONE_DURATION_SEC,
192+
AMPLITUDE,
193+
)
194+
await source.capture_frame(frame)
195+
await source.wait_for_playout()
196+
197+
async def collect_samples() -> np.ndarray:
198+
buffers: list[np.ndarray] = []
199+
total = 0
200+
async for event in audio_stream:
201+
chunk = np.frombuffer(bytes(event.frame.data.cast("B")), dtype=np.int16)
202+
buffers.append(chunk)
203+
total += len(chunk)
204+
if total >= collect_samples_target:
205+
break
206+
return np.concatenate(buffers) if buffers else np.array([], dtype=np.int16)
207+
208+
publish_task = asyncio.create_task(publish_tones())
209+
received = await asyncio.wait_for(collect_samples(), timeout=20.0)
210+
await publish_task
211+
await audio_stream.aclose()
212+
await source.aclose()
213+
214+
assert len(received) >= target_samples, (
215+
f"Expected at least {target_samples} samples, got {len(received)}"
216+
)
217+
218+
recv_wav_path = Path(__file__).parent / "subscriber_recv_freqs.wav"
219+
with wave.open(str(recv_wav_path), "wb") as wav_out:
220+
wav_out.setnchannels(NUM_CHANNELS)
221+
wav_out.setsampwidth(ctypes.sizeof(ctypes.c_int16))
222+
wav_out.setframerate(SAMPLE_RATE)
223+
wav_out.writeframes(received.tobytes())
224+
225+
# Find signal onset to skip codec startup silence.
226+
envelope = np.abs(received.astype(np.float32))
227+
threshold = float(envelope.max()) * 0.2
228+
onset_candidates = np.where(envelope > threshold)[0]
229+
assert onset_candidates.size > 0, "Received audio contains only silence"
230+
onset = int(onset_candidates[0])
231+
232+
samples_per_tone = int(SAMPLE_RATE * TONE_DURATION_SEC)
233+
# Analyze the middle slice of each tone window to avoid boundary transitions.
234+
analysis_margin = int(SAMPLE_RATE * 0.2)
235+
analysis_length = samples_per_tone - 2 * analysis_margin
236+
237+
per_tone_peaks: list[tuple[int, float]] = []
238+
for idx, expected_freq in enumerate(FREQUENCIES_HZ):
239+
start = onset + idx * samples_per_tone + analysis_margin
240+
end = start + analysis_length
241+
assert end <= len(received), (
242+
f"Not enough samples for tone {idx} (expected {expected_freq} Hz): "
243+
f"need {end}, have {len(received)}"
244+
)
245+
segment = received[start:end]
246+
segment_frame = AudioFrame(
247+
data=segment.tobytes(),
248+
sample_rate=SAMPLE_RATE,
249+
num_channels=NUM_CHANNELS,
250+
samples_per_channel=len(segment),
251+
)
252+
peak_hz = _detect_peak_frequency(segment_frame)
253+
per_tone_peaks.append((expected_freq, peak_hz))
254+
255+
# Opus transcoding adds spectral jitter; allow a 15 Hz tolerance.
256+
assert peak_hz == pytest.approx(expected_freq, abs=15.0), (
257+
f"Tone {idx}: expected {expected_freq} Hz, got peak at {peak_hz:.1f} Hz. "
258+
f"All peaks: {per_tone_peaks}"
259+
)
260+
261+
# The target band should also dominate the other sweep bands.
262+
energies = _band_energies(segment_frame, FREQUENCIES_HZ, bandwidth_hz=30.0)
263+
target_energy = energies[expected_freq]
264+
other_energy = sum(v for k, v in energies.items() if k != expected_freq)
265+
assert target_energy > 5.0 * max(other_energy, 1e-12), (
266+
f"Tone {idx} ({expected_freq} Hz) did not dominate other bands: "
267+
f"target={target_energy:.3e}, other={other_energy:.3e}"
268+
)
269+
finally:
270+
await publisher_room.disconnect()
271+
await subscriber_room.disconnect()

0 commit comments

Comments
 (0)