Skip to content

Commit f9e8069

Browse files
committed
nits
1 parent c40a6f9 commit f9e8069

3 files changed

Lines changed: 167 additions & 71 deletions

File tree

src/elevenlabs/conversational_ai/base_connection.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,22 @@ async def send_audio(self, audio_data: bytes) -> None:
3939
"""Send audio data through the connection."""
4040
pass
4141

42+
def send_message_sync(self, message: dict) -> None:
43+
"""Send a message synchronously (for compatibility with sync code)."""
44+
import asyncio
45+
try:
46+
# Try to get the current event loop
47+
loop = asyncio.get_event_loop()
48+
if loop.is_running():
49+
# If loop is running, create a task
50+
asyncio.create_task(self.send_message(message))
51+
else:
52+
# If loop is not running, run the coroutine
53+
loop.run_until_complete(self.send_message(message))
54+
except RuntimeError:
55+
# No event loop, create new one
56+
asyncio.run(self.send_message(message))
57+
4258
def on_message(self, callback: Callable[[dict], Union[None, Awaitable[None]]]) -> None:
4359
"""Set the message callback."""
4460
self._on_message_callback = callback

src/elevenlabs/conversational_ai/conversation.py

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -622,17 +622,27 @@ def end_session(self):
622622
if self._connection:
623623
connection_type = self._determine_connection_type()
624624
if connection_type == ConnectionType.WEBRTC:
625-
# For WebRTC, we need to close the connection in an async context
626625
import asyncio
627626
try:
628-
loop = asyncio.get_event_loop()
629-
if loop.is_running():
630-
asyncio.create_task(self._connection.close())
631-
else:
632-
loop.run_until_complete(self._connection.close())
633-
except RuntimeError:
634-
# No event loop running, create a new one
635-
asyncio.run(self._connection.close())
627+
try:
628+
loop = asyncio.get_event_loop()
629+
if loop.is_running():
630+
task = asyncio.create_task(self._connection.close())
631+
else:
632+
asyncio.wait_for(
633+
loop.run_until_complete(self._connection.close()),
634+
timeout=5.0
635+
)
636+
except RuntimeError:
637+
async def cleanup():
638+
await asyncio.wait_for(self._connection.close(), timeout=5.0)
639+
640+
asyncio.run(cleanup())
641+
642+
except asyncio.TimeoutError:
643+
print("Warning: WebRTC connection cleanup timed out")
644+
except Exception as e:
645+
print(f"Warning: Error during WebRTC connection cleanup: {e}")
636646
self._connection = None
637647

638648
if self.callback_end_session:
@@ -678,9 +688,7 @@ def send_user_message(self, text: str):
678688

679689
event = UserMessageClientToOrchestratorEvent(text=text)
680690
try:
681-
# Send through WebRTC connection
682-
import asyncio
683-
asyncio.create_task(self._connection.send_message(event.to_dict()))
691+
self._connection.send_message_sync(event.to_dict())
684692
except Exception as e:
685693
print(f"Error sending user message: {e}")
686694
raise
@@ -768,8 +776,15 @@ def _run_webrtc(self):
768776
try:
769777
# Connect to WebRTC
770778
import asyncio
771-
loop = asyncio.new_event_loop()
772-
asyncio.set_event_loop(loop)
779+
780+
try:
781+
loop = asyncio.get_event_loop()
782+
if loop.is_closed():
783+
loop = asyncio.new_event_loop()
784+
asyncio.set_event_loop(loop)
785+
except RuntimeError:
786+
loop = asyncio.new_event_loop()
787+
asyncio.set_event_loop(loop)
773788

774789
async def webrtc_session():
775790
await self._connection.connect()

src/elevenlabs/conversational_ai/webrtc_connection.py

Lines changed: 122 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,14 @@
22
import asyncio
33
from typing import Optional, Dict, Any, Callable, Union, Awaitable
44
import httpx
5-
from livekit.rtc import Room, TrackKind
5+
6+
try:
7+
from livekit.rtc import Room, TrackKind
8+
except ImportError:
9+
raise ImportError(
10+
"livekit package is required for WebRTC support. "
11+
"Install with: pip install livekit"
12+
)
613

714
from .base_connection import BaseConnection
815

@@ -17,7 +24,7 @@ def __init__(
1724
api_origin: Optional[str] = None,
1825
overrides: Optional[Dict[str, Any]] = None,
1926
on_debug: Optional[Callable[[Dict[str, Any]], None]] = None,
20-
):
27+
) -> None:
2128
self.conversation_token = conversation_token
2229
self.agent_id = agent_id
2330
self.livekit_url = livekit_url
@@ -40,7 +47,7 @@ def __init__(
4047
api_origin: Optional[str] = None,
4148
overrides: Optional[Dict[str, Any]] = None,
4249
on_debug: Optional[Callable[[Dict[str, Any]], None]] = None,
43-
):
50+
) -> None:
4451
super().__init__()
4552
self.conversation_token = conversation_token
4653
self.agent_id = agent_id
@@ -49,7 +56,7 @@ def __init__(
4956
self.overrides = overrides or {}
5057
self.on_debug = on_debug
5158
self._room: Optional[Room] = None
52-
self._is_connected = False
59+
self._is_connected: bool = False
5360

5461
@classmethod
5562
async def create(cls, config: WebRTCConnectionConfig) -> "WebRTCConnection":
@@ -68,46 +75,82 @@ async def create(cls, config: WebRTCConnectionConfig) -> "WebRTCConnection":
6875

6976
async def connect(self) -> None:
7077
"""Establish the WebRTC connection using LiveKit."""
71-
# Get conversation token if not provided
72-
if not self.conversation_token:
73-
if not self.agent_id:
74-
raise ValueError("Either conversation_token or agent_id is required for WebRTC connection")
75-
self.conversation_token = await self._fetch_conversation_token()
76-
77-
# Create room and connect
78-
self._room = Room()
79-
self._setup_room_callbacks()
80-
81-
# Connect to LiveKit room using configurable URL
82-
await self._room.connect(self.livekit_url, self.conversation_token)
83-
self._is_connected = True
84-
85-
# Set conversation ID from room name if available
86-
if self._room.name:
87-
# Extract conversation ID from room name if it contains one
88-
import re
89-
match = re.search(r'(conv_[a-zA-Z0-9]+)', self._room.name)
90-
self.conversation_id = match.group(0) if match else self._room.name
91-
else:
92-
self.conversation_id = f"webrtc-{id(self)}"
93-
94-
# Enable microphone
95-
await self._room.local_participant.set_microphone_enabled(True)
96-
97-
# Send overrides if any
98-
if self.overrides:
99-
await self.send_message(self._construct_overrides())
78+
try:
79+
# Get conversation token if not provided
80+
if not self.conversation_token:
81+
if not self.agent_id:
82+
raise ValueError("Either conversation_token or agent_id is required for WebRTC connection")
83+
self.conversation_token = await self._fetch_conversation_token()
10084

101-
self.debug({
102-
"type": "conversation_initiation_client_data",
103-
"message": self._construct_overrides()
104-
})
85+
# Create room and connect
86+
self._room = Room()
87+
self._setup_room_callbacks()
88+
89+
# Connect to LiveKit room using configurable URL
90+
try:
91+
await self._room.connect(self.livekit_url, self.conversation_token)
92+
self._is_connected = True
93+
except Exception as e:
94+
self._is_connected = False
95+
raise ConnectionError(f"Failed to connect to LiveKit room: {e}") from e
96+
97+
# Set conversation ID from room name if available
98+
if self._room.name:
99+
# Extract conversation ID from room name if it contains one
100+
import re
101+
match = re.search(r'(conv_[a-zA-Z0-9]+)', self._room.name)
102+
self.conversation_id = match.group(0) if match else self._room.name
103+
else:
104+
self.conversation_id = f"webrtc-{id(self)}"
105+
106+
# Enable microphone
107+
try:
108+
await self._room.local_participant.set_microphone_enabled(True)
109+
except Exception as e:
110+
self.debug({
111+
"type": "microphone_enable_error",
112+
"error": str(e)
113+
})
114+
# Don't fail the connection for microphone issues
115+
116+
# Send overrides if any
117+
if self.overrides:
118+
try:
119+
await self.send_message(self._construct_overrides())
120+
except Exception as e:
121+
self.debug({
122+
"type": "overrides_send_error",
123+
"error": str(e)
124+
})
125+
126+
self.debug({
127+
"type": "conversation_initiation_client_data",
128+
"message": self._construct_overrides()
129+
})
130+
131+
except Exception as e:
132+
# Ensure cleanup on connection failure
133+
if self._room:
134+
try:
135+
await self._room.disconnect()
136+
except:
137+
pass
138+
self._room = None
139+
self._is_connected = False
140+
raise
105141

106142
async def close(self) -> None:
107143
"""Close the WebRTC connection."""
108144
if self._room:
109-
await self._room.disconnect()
110-
self._room = None
145+
try:
146+
await self._room.disconnect()
147+
except Exception as e:
148+
self.debug({
149+
"type": "disconnect_error",
150+
"error": str(e)
151+
})
152+
finally:
153+
self._room = None
111154
self._is_connected = False
112155

113156
async def send_message(self, message: dict) -> None:
@@ -148,32 +191,54 @@ async def _fetch_conversation_token(self) -> str:
148191
if not self.agent_id:
149192
raise ValueError("Agent ID is required to fetch conversation token")
150193

151-
# Get version and source from overrides or use defaults
152-
version = self.overrides.get("client", {}).get("version", "2.15.0") # From pyproject.toml
153-
source = self.overrides.get("client", {}).get("source", "python_sdk")
194+
try:
195+
# Get version and source from overrides or use defaults
196+
version = self.overrides.get("client", {}).get("version", "2.15.0") # From pyproject.toml
197+
source = self.overrides.get("client", {}).get("source", "python_sdk")
198+
199+
# Convert WSS origin to HTTPS for API calls
200+
api_origin = self._convert_wss_to_https(self.api_origin)
154201

155-
# Convert WSS origin to HTTPS for API calls
156-
api_origin = self._convert_wss_to_https(self.api_origin)
202+
url = f"{api_origin}/v1/convai/conversation/token?agent_id={self.agent_id}&source={source}&version={version}"
157203

158-
url = f"{api_origin}/v1/convai/conversation/token?agent_id={self.agent_id}&source={source}&version={version}"
204+
async with httpx.AsyncClient(timeout=30.0) as client:
205+
try:
206+
response = await client.get(url)
207+
except httpx.TimeoutException:
208+
raise ConnectionError(f"Timeout when fetching conversation token for agent {self.agent_id}")
209+
except httpx.NetworkError as e:
210+
raise ConnectionError(f"Network error when fetching conversation token: {e}")
159211

160-
async with httpx.AsyncClient() as client:
161-
response = await client.get(url)
212+
if not response.is_success:
213+
error_msg = f"ElevenLabs API returned {response.status_code} {response.reason_phrase}"
214+
if response.status_code == 401:
215+
error_msg = "Your agent has authentication enabled, but no signed URL or conversation token was provided."
216+
elif response.status_code == 404:
217+
error_msg = f"Agent with ID {self.agent_id} not found"
218+
elif response.status_code == 429:
219+
error_msg = "Rate limit exceeded. Please try again later."
162220

163-
if not response.is_success:
164-
error_msg = f"ElevenLabs API returned {response.status_code} {response.reason_phrase}"
165-
if response.status_code == 401:
166-
error_msg = "Your agent has authentication enabled, but no signed URL or conversation token was provided."
221+
raise Exception(f"Failed to fetch conversation token for agent {self.agent_id}: {error_msg}")
167222

168-
raise Exception(f"Failed to fetch conversation token for agent {self.agent_id}: {error_msg}")
223+
try:
224+
data = response.json()
225+
except Exception as e:
226+
raise Exception(f"Invalid JSON response from API: {e}")
169227

170-
data = response.json()
171-
token = data.get("token")
228+
token = data.get("token")
172229

173-
if not token:
174-
raise Exception("No conversation token received from API")
230+
if not token:
231+
raise Exception("No conversation token received from API")
175232

176-
return token
233+
return token
234+
235+
except Exception as e:
236+
self.debug({
237+
"type": "token_fetch_error",
238+
"agent_id": self.agent_id,
239+
"error": str(e)
240+
})
241+
raise
177242

178243
def _convert_wss_to_https(self, origin: str) -> str:
179244
"""Convert WSS origin to HTTPS for API calls."""

0 commit comments

Comments
 (0)