Skip to content

Commit fdd7459

Browse files
cursoragentangelo
andcommitted
Refactor audio interfaces to properly manage resources
Initialize and clean up audio streams and PyAudio instances. Add a _started flag to prevent double starts and ensure proper cleanup. Co-authored-by: angelo <angelo@elevenlabs.io>
1 parent 6949773 commit fdd7459

1 file changed

Lines changed: 84 additions & 28 deletions

File tree

src/elevenlabs/conversational_ai/default_audio_interface.py

Lines changed: 84 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,24 @@ def __init__(self):
1818
self.pyaudio = pyaudio
1919
self.should_stop = threading.Event()
2020
self.output_thread = None
21+
self.output_queue = None
22+
self.in_stream = None
23+
self.out_stream = None
24+
self.p = None
25+
self.input_callback = None
26+
self._started = False
2127

2228
def start(self, input_callback: Callable[[bytes], None]):
29+
if self._started:
30+
# If already started, stop first to avoid resource leaks
31+
self.stop()
32+
2333
# Audio input is using callbacks from pyaudio which we simply pass through.
2434
self.input_callback = input_callback
2535

2636
# Audio output is buffered so we can handle interruptions.
2737
# Start a separate thread to handle writing to the output stream.
28-
self.output_queue: queue.Queue[bytes] = queue.Queue()
38+
self.output_queue = queue.Queue()
2939
self.should_stop.clear() # Reset the event in case start is called multiple times
3040
self.output_thread = threading.Thread(target=self._output_thread)
3141

@@ -49,32 +59,50 @@ def start(self, input_callback: Callable[[bytes], None]):
4959
)
5060

5161
self.output_thread.start()
62+
self._started = True
5263

5364
def stop(self):
65+
if not self._started:
66+
return # Nothing to stop
67+
5468
self.should_stop.set()
69+
5570
if self.output_thread and self.output_thread.is_alive():
5671
self.output_thread.join()
57-
if hasattr(self, 'in_stream'):
72+
73+
if self.in_stream:
5874
self.in_stream.stop_stream()
5975
self.in_stream.close()
60-
if hasattr(self, 'out_stream'):
76+
self.in_stream = None
77+
78+
if self.out_stream:
6179
self.out_stream.close()
62-
if hasattr(self, 'p'):
80+
self.out_stream = None
81+
82+
if self.p:
6383
self.p.terminate()
84+
self.p = None
85+
86+
self.output_thread = None
87+
self.output_queue = None
88+
self.input_callback = None
89+
self._started = False
6490

6591
def output(self, audio: bytes):
66-
self.output_queue.put(audio)
92+
if self.output_queue:
93+
self.output_queue.put(audio)
6794

6895
def interrupt(self):
6996
# Clear the output queue to stop any audio that is currently playing.
7097
# Note: We can't atomically clear the whole queue, but we are doing
7198
# it from the message handling thread so no new audio will be added
7299
# while we are clearing.
73-
try:
74-
while True:
75-
_ = self.output_queue.get(block=False)
76-
except queue.Empty:
77-
pass
100+
if self.output_queue:
101+
try:
102+
while True:
103+
_ = self.output_queue.get(block=False)
104+
except queue.Empty:
105+
pass
78106

79107
def _output_thread(self):
80108
while not self.should_stop.is_set():
@@ -102,14 +130,24 @@ def __init__(self):
102130
self.pyaudio = pyaudio
103131
self.should_stop = asyncio.Event()
104132
self.output_task = None
133+
self.output_queue = None
134+
self.in_stream = None
135+
self.out_stream = None
136+
self.p = None
137+
self.input_callback = None
138+
self._started = False
105139

106140
async def start(self, input_callback: Callable[[bytes], Awaitable[None]]):
141+
if self._started:
142+
# If already started, stop first to avoid resource leaks
143+
await self.stop()
144+
107145
# Audio input is using callbacks from pyaudio which we adapt to async
108146
self.input_callback = input_callback
109147

110148
# Audio output is buffered so we can handle interruptions.
111149
# Start a separate task to handle writing to the output stream.
112-
self.output_queue: asyncio.Queue[bytes] = asyncio.Queue()
150+
self.output_queue = asyncio.Queue()
113151
self.should_stop.clear() # Reset the event in case start is called multiple times
114152

115153
self.p = self.pyaudio.PyAudio()
@@ -133,37 +171,55 @@ async def start(self, input_callback: Callable[[bytes], Awaitable[None]]):
133171

134172
# Start the output task
135173
self.output_task = asyncio.create_task(self._output_task())
174+
self._started = True
136175

137176
async def stop(self):
177+
if not self._started:
178+
return # Nothing to stop
179+
138180
self.should_stop.set()
181+
139182
if self.output_task and not self.output_task.done():
140183
await self.output_task
141-
if hasattr(self, 'in_stream'):
184+
185+
if self.in_stream:
142186
self.in_stream.stop_stream()
143187
self.in_stream.close()
144-
if hasattr(self, 'out_stream'):
188+
self.in_stream = None
189+
190+
if self.out_stream:
145191
self.out_stream.close()
146-
if hasattr(self, 'p'):
192+
self.out_stream = None
193+
194+
if self.p:
147195
self.p.terminate()
196+
self.p = None
197+
198+
self.output_task = None
199+
self.output_queue = None
200+
self.input_callback = None
201+
self._started = False
148202

149203
async def output(self, audio: bytes):
150-
await self.output_queue.put(audio)
204+
if self.output_queue:
205+
await self.output_queue.put(audio)
151206

152207
async def interrupt(self):
153208
# Clear the output queue to stop any audio that is currently playing.
154-
try:
155-
while True:
156-
try:
157-
_ = self.output_queue.get_nowait()
158-
except asyncio.QueueEmpty:
159-
break
160-
except AttributeError:
161-
# In Python 3.8, it's asyncio.QueueEmpty, in 3.10+ it's asyncio.QueueEmpty
162-
while not self.output_queue.empty():
163-
try:
164-
_ = self.output_queue.get_nowait()
165-
except:
166-
break
209+
if self.output_queue:
210+
try:
211+
while True:
212+
try:
213+
_ = self.output_queue.get_nowait()
214+
except asyncio.QueueEmpty:
215+
break
216+
except AttributeError:
217+
# In Python 3.8, it's asyncio.QueueEmpty, in 3.10+ it's asyncio.QueueEmpty
218+
while not self.output_queue.empty():
219+
try:
220+
_ = self.output_queue.get_nowait()
221+
except:
222+
break
167223

168224
async def _output_task(self):
169225
while not self.should_stop.is_set():

0 commit comments

Comments
 (0)