-
Notifications
You must be signed in to change notification settings - Fork 120
Expand file tree
/
Copy pathutils.py
More file actions
138 lines (111 loc) · 4.23 KB
/
Copy pathutils.py
File metadata and controls
138 lines (111 loc) · 4.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
from __future__ import annotations
from typing import AsyncIterator
from .audio_frame import AudioFrame
__all__ = ["combine_audio_frames", "sine_wave_generator"]
def combine_audio_frames(buffer: AudioFrame | list[AudioFrame]) -> AudioFrame:
"""
Combines one or more `rtc.AudioFrame` objects into a single `rtc.AudioFrame`.
This function concatenates the audio data from multiple frames, ensuring that
all frames have the same sample rate and number of channels. It efficiently
merges the data by preallocating the necessary memory and copying the frame
data without unnecessary reallocations.
Args:
buffer: A single `rtc.AudioFrame` or a list of `rtc.AudioFrame`
objects to be combined.
Returns:
rtc.AudioFrame: A new `rtc.AudioFrame` containing the combined audio data.
Raises:
ValueError: If the buffer is empty.
ValueError: If frames have differing sample rates.
ValueError: If frames have differing numbers of channels.
Example:
>>> frame1 = rtc.AudioFrame(
... data=b"\x01\x02", sample_rate=48000, num_channels=2, samples_per_channel=1
... )
>>> frame2 = rtc.AudioFrame(
... data=b"\x03\x04", sample_rate=48000, num_channels=2, samples_per_channel=1
... )
>>> combined_frame = combine_audio_frames([frame1, frame2])
>>> combined_frame.data
b'\x01\x02\x03\x04'
>>> combined_frame.sample_rate
48000
>>> combined_frame.num_channels
2
>>> combined_frame.samples_per_channel
2
"""
if not isinstance(buffer, list):
return buffer
if not buffer:
raise ValueError("buffer is empty")
sample_rate = buffer[0].sample_rate
num_channels = buffer[0].num_channels
total_data_length = 0
total_samples_per_channel = 0
for frame in buffer:
if frame.sample_rate != sample_rate:
raise ValueError(
f"Sample rate mismatch: expected {sample_rate}, got {frame.sample_rate}"
)
if frame.num_channels != num_channels:
raise ValueError(
f"Channel count mismatch: expected {num_channels}, got {frame.num_channels}"
)
total_data_length += len(frame.data)
total_samples_per_channel += frame.samples_per_channel
data = bytearray(total_data_length)
offset = 0
for frame in buffer:
frame_data = frame.data.cast("b")
data[offset : offset + len(frame_data)] = frame_data
offset += len(frame_data)
return AudioFrame(
data=data,
sample_rate=sample_rate,
num_channels=num_channels,
samples_per_channel=total_samples_per_channel,
)
async def sine_wave_generator(
freq: float,
duration: float,
sample_rate: int = 48000,
amplitude: float = 0.3,
) -> AsyncIterator[AudioFrame]:
"""
Generate sine wave audio frames.
Useful for testing audio pipelines and generating test signals.
Args:
freq: Frequency of the sine wave in Hz.
duration: Duration of the audio in seconds.
sample_rate: Sample rate in Hz (default: 48000).
amplitude: Amplitude of the sine wave, range [0.0, 1.0] (default: 0.3).
Yields:
AudioFrame: Audio frames containing sine wave data.
Example:
>>> import asyncio
>>> async def generate_audio():
... async for frame in sine_wave_generator(440, 1.0):
... print(f"Generated frame with {frame.samples_per_channel} samples")
>>> asyncio.run(generate_audio())
"""
try:
import numpy as np
except ImportError:
raise ImportError(
"numpy is required for sine_wave_generator. Install it with: pip install numpy"
)
blocksize = sample_rate // 10
total_frames = int((duration * sample_rate) // blocksize)
t_frame = np.arange(blocksize) / sample_rate
for i in range(total_frames):
t = t_frame + i * blocksize / sample_rate
signal = amplitude * np.sin(2 * np.pi * freq * t)
signal_int16 = np.int16(signal * 32767)
frame = AudioFrame(
signal_int16.tobytes(),
sample_rate,
1,
blocksize,
)
yield frame